1 /* 2 * #%L 3 * Aggregate.java - mongodb-async-driver - Allanbank Consulting, Inc. 4 * %% 5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. 6 * %% 7 * Licensed under the Apache License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. 9 * You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 * #L% 19 */ 20 21 package com.allanbank.mongodb.builder; 22 23 import java.util.ArrayList; 24 import java.util.Arrays; 25 import java.util.Collections; 26 import java.util.List; 27 import java.util.concurrent.TimeUnit; 28 29 import com.allanbank.mongodb.MongoCollection; 30 import com.allanbank.mongodb.ReadPreference; 31 import com.allanbank.mongodb.Version; 32 import com.allanbank.mongodb.bson.DocumentAssignable; 33 import com.allanbank.mongodb.bson.Element; 34 import com.allanbank.mongodb.bson.builder.ArrayBuilder; 35 import com.allanbank.mongodb.bson.builder.BuilderFactory; 36 import com.allanbank.mongodb.bson.builder.DocumentBuilder; 37 import com.allanbank.mongodb.bson.element.ArrayElement; 38 import com.allanbank.mongodb.bson.element.DocumentElement; 39 import com.allanbank.mongodb.bson.element.IntegerElement; 40 import com.allanbank.mongodb.builder.expression.Expression; 41 import com.allanbank.mongodb.builder.expression.Expressions; 42 43 /** 44 * Aggregate provides support for the <tt>aggregate</tt> command supporting a 45 * pipeline of commands to execute. 46 * <p> 47 * Instances of this class are constructed via the inner {@link Builder} class. 48 * Due to the potential complexity of pipelines and the associated operators the 49 * <tt>Builder</tt> is intended to be used with the various support classes 50 * including the {@link Expressions} library. For example:<blockquote> 51 * 52 * <pre> 53 * <code> 54 * import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set}; 55 * import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id}; 56 * import static {@link AggregationProjectFields#includeWithoutId com.allanbank.mongodb.builder.AggregationProjectFields.includeWithoutId}; 57 * import static {@link QueryBuilder#where com.allanbank.mongodb.builder.QueryBuilder.where}; 58 * import static {@link Sort#asc com.allanbank.mongodb.builder.Sort.asc}; 59 * import static {@link Sort#desc com.allanbank.mongodb.builder.Sort.desc}; 60 * import static {@link Expressions#field com.allanbank.mongodb.builder.expression.Expressions.field}; 61 * import static {@link Expressions#set com.allanbank.mongodb.builder.expression.Expressions.set}; 62 * 63 * DocumentBuilder b1 = BuilderFactory.start(); 64 * DocumentBuilder b2 = BuilderFactory.start(); 65 * Aggregate.Builder builder = new Aggregate.Builder(); 66 * 67 * builder.match(where("state").notEqualTo("NZ")) 68 * .group(id().addField("state") 69 * .addField("city"), 70 * set("pop").sum("pop")) 71 * .sort(asc("pop")) 72 * .group(id("_id.state"), 73 * set("biggestcity").last("_id.city"), 74 * set("biggestpop").last("pop"), 75 * set("smallestcity").first("_id.city"), 76 * set("smallestpop").first("pop")) 77 * .project( 78 * includeWithoutId(), 79 * set("state", field("_id")), 80 * set("biggestCity", 81 * b1.add(set("name", field("biggestcity"))).add( 82 * set("pop", field("biggestpop")))), 83 * set("smallestCity", 84 * b2.add(set("name", field("smallestcity"))).add( 85 * set("pop", field("smallestpop"))))) 86 * .sort(desc("biggestCity.pop")); 87 * </code> 88 * </pre> 89 * 90 * </blockquote> 91 * </p> 92 * 93 * 94 * @see <a 95 * href="http://docs.mongodb.org/manual/tutorial/aggregation-examples/#largest-and-smallest-cities-by-state">Example 96 * Inspired By</a> 97 * @api.yes This class is part of the driver's API. Public and protected members 98 * will be deprecated for at least 1 non-bugfix release (version 99 * numbers are <major>.<minor>.<bugfix>) before being 100 * removed or modified. 101 * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved 102 */ 103 public class Aggregate { 104 105 /** 106 * The first version of MongoDB to support the {@code $geoNear} pipeline 107 * operator. 108 */ 109 public static final Version ALLOW_DISK_USAGE_REQUIRED_VERSION = Version 110 .parse("2.6"); 111 112 /** 113 * The first version of MongoDB to support the {@code aggregate} command 114 * using a cursor. 115 */ 116 public static final Version CURSOR_VERSION = Version.parse("2.5.2"); 117 118 /** 119 * The first version of MongoDB to support the {@code aggregate} command 120 * with the explain option. 121 */ 122 public static final Version EXPLAIN_VERSION = Version.parse("2.5.3"); 123 124 /** 125 * The first version of MongoDB to support the {@code $geoNear} pipeline 126 * operator. 127 */ 128 public static final Version GEO_NEAR_REQUIRED_VERSION = Version.VERSION_2_4; 129 130 /** 131 * The first version of MongoDB to support the {@code aggregate} command 132 * with the ability to limit the execution time on the server. 133 */ 134 public static final Version MAX_TIMEOUT_VERSION = Find.MAX_TIMEOUT_VERSION; 135 136 /** 137 * The first version of MongoDB to support the {@code $redact} pipeline 138 * operator. 139 */ 140 public static final Version REDACT_REQUIRED_VERSION = Version 141 .parse("2.5.2"); 142 143 /** The first version of MongoDB to support the {@code aggregate} command. */ 144 public static final Version REQUIRED_VERSION = Version.parse("2.1.0"); 145 146 /** 147 * Creates a new builder for a {@link Aggregate}. 148 * 149 * @return The builder to construct a {@link Aggregate}. 150 */ 151 public static Builder builder() { 152 return new Builder(); 153 } 154 155 /** 156 * Set to true if the aggregation results should be allowed to spill to 157 * disk. 158 */ 159 private final boolean myAllowDiskUsage; 160 161 /** The number of documents to be returned in each batch of results. */ 162 private final int myBatchSize; 163 164 /** The total number of documents to be returned. */ 165 private final int myLimit; 166 167 /** The maximum amount of time to allow the command to run. */ 168 private final long myMaximumTimeMilliseconds; 169 170 /** The pipeline of operations to be applied. */ 171 private final List<Element> myPipeline; 172 173 /** The read preference to use. */ 174 private final ReadPreference myReadPreference; 175 176 /** The version required for the aggregation. */ 177 private final Version myRequiredVersion; 178 179 /** Set to true if the aggregation results should be returned as a cursor. */ 180 private final boolean myUseCursor; 181 182 /** 183 * Creates a new Aggregation. 184 * 185 * @param builder 186 * The builder for the Aggregation instance. 187 */ 188 protected Aggregate(final Builder builder) { 189 myPipeline = Collections.unmodifiableList(Arrays 190 .asList(builder.myPipeline.build())); 191 myBatchSize = builder.myBatchSize; 192 myLimit = builder.myLimit; 193 myUseCursor = builder.myUseCursor; 194 myAllowDiskUsage = builder.myAllowDiskUsage; 195 myReadPreference = builder.myReadPreference; 196 myRequiredVersion = builder.myRequiredVersion; 197 myMaximumTimeMilliseconds = builder.myMaximumTimeMilliseconds; 198 } 199 200 /** 201 * Returns the number of documents to be returned in each batch of results 202 * by the cursor. 203 * 204 * @return The number of documents to be returned in each batch of results 205 * by the cursor. 206 */ 207 public int getBatchSize() { 208 return myBatchSize; 209 } 210 211 /** 212 * Returns the total number of documents to be returned by the cursor. 213 * 214 * @return The total number of documents to be returned the cursor. 215 */ 216 public int getCursorLimit() { 217 return myLimit; 218 } 219 220 /** 221 * Returns the maximum amount of time to allow the command to run on the 222 * Server before it is aborted. 223 * 224 * @return The maximum amount of time to allow the command to run on the 225 * Server before it is aborted. 226 * 227 * @since MongoDB 2.6 228 */ 229 public long getMaximumTimeMilliseconds() { 230 return myMaximumTimeMilliseconds; 231 } 232 233 /** 234 * Returns the pipeline of operations to apply. 235 * 236 * @return The pipeline of operations to apply. 237 */ 238 public List<Element> getPipeline() { 239 return myPipeline; 240 } 241 242 /** 243 * Returns the {@link ReadPreference} specifying which servers may be used 244 * to execute the aggregation. 245 * <p> 246 * If <code>null</code> then the {@link MongoCollection} instance's 247 * {@link ReadPreference} will be used. 248 * </p> 249 * 250 * @return The read preference to use. 251 * 252 * @see MongoCollection#getReadPreference() 253 */ 254 public ReadPreference getReadPreference() { 255 return myReadPreference; 256 } 257 258 /** 259 * Returns the version required for the aggregation. 260 * 261 * @return The version required for the aggregation. 262 */ 263 public Version getRequiredVersion() { 264 return myRequiredVersion; 265 } 266 267 /** 268 * Returns true if the aggregation results should be allowed to spill to 269 * disk. 270 * 271 * @return True if the aggregation results should be allowed to spill to 272 * disk. 273 */ 274 public boolean isAllowDiskUsage() { 275 return myAllowDiskUsage; 276 } 277 278 /** 279 * Returns true if the aggregation results should be returned as a cursor. 280 * 281 * @return True if the aggregation results should be returned as a cursor. 282 */ 283 public boolean isUseCursor() { 284 return myUseCursor; 285 } 286 287 /** 288 * Builder provides the ability to construct aggregate command pipelines. 289 * <p> 290 * Methods are provided for all existing pipeline operators and generic 291 * {@link #step} methods are provided to support future pipeline operators 292 * while in development or before the driver is updated. 293 * </p> 294 * <p> 295 * This builder is intended to be used with the various support classes 296 * including the {@link Expressions} library. For example:<blockquote> 297 * 298 * <pre> 299 * <code> 300 * import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set}; 301 * import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id}; 302 * import static {@link AggregationProjectFields#includeWithoutId com.allanbank.mongodb.builder.AggregationProjectFields.includeWithoutId}; 303 * import static {@link QueryBuilder#where com.allanbank.mongodb.builder.QueryBuilder.where}; 304 * import static {@link Sort#asc com.allanbank.mongodb.builder.Sort.asc}; 305 * import static {@link Sort#desc com.allanbank.mongodb.builder.Sort.desc}; 306 * import static {@link Expressions#field com.allanbank.mongodb.builder.expression.Expressions.field}; 307 * import static {@link Expressions#set com.allanbank.mongodb.builder.expression.Expressions.set}; 308 * 309 * DocumentBuilder b1 = BuilderFactory.start(); 310 * DocumentBuilder b2 = BuilderFactory.start(); 311 * Aggregation.Builder builder = new Aggregation.Builder(); 312 * 313 * builder.match(where("state").notEqualTo("NZ")) 314 * .group(id().addField("state") 315 * .addField("city"), 316 * set("pop").sum("pop")) 317 * .sort(asc("pop")) 318 * .group(id("_id.state"), 319 * set("biggestcity").last("_id.city"), 320 * set("biggestpop").last("pop"), 321 * set("smallestcity").first("_id.city"), 322 * set("smallestpop").first("pop")) 323 * .project( 324 * includeWithoutId(), 325 * set("state", field("_id")), 326 * set("biggestCity", 327 * b1.add(set("name", field("biggestcity"))).add( 328 * set("pop", field("biggestpop")))), 329 * set("smallestCity", 330 * b2.add(set("name", field("smallestcity"))).add( 331 * set("pop", field("smallestpop"))))) 332 * .sort(desc("biggestCity.pop")); 333 * </code> 334 * </pre> 335 * 336 * </blockquote> 337 * </p> 338 * 339 * @see <a 340 * href="http://docs.mongodb.org/manual/tutorial/aggregation-examples/#largest-and-smallest-cities-by-state">Example 341 * Inspired By</a> 342 * @api.yes This class is part of the driver's API. Public and protected 343 * members will be deprecated for at least 1 non-bugfix release 344 * (version numbers are <major>.<minor>.<bugfix>) 345 * before being removed or modified. 346 * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved 347 */ 348 public static class Builder { 349 350 /** 351 * Set to true if the aggregation results should be allowed to spill to 352 * disk. 353 */ 354 protected boolean myAllowDiskUsage; 355 356 /** The number of documents to be returned in each batch of results. */ 357 protected int myBatchSize; 358 359 /** The total number of documents to be returned. */ 360 protected int myLimit; 361 362 /** The maximum amount of time to allow the command to run. */ 363 protected long myMaximumTimeMilliseconds; 364 365 /** The pipeline of operations to be applied. */ 366 protected final ArrayBuilder myPipeline; 367 368 /** The read preference to use. */ 369 protected ReadPreference myReadPreference; 370 371 /** 372 * The version required for the aggregation. This is computed 373 * automatically based on the pipeline constructed. 374 */ 375 protected Version myRequiredVersion; 376 377 /** 378 * Set to true if the aggregation results should be returned as a 379 * cursor. 380 */ 381 protected boolean myUseCursor; 382 383 /** 384 * Creates a new Builder. 385 */ 386 public Builder() { 387 myPipeline = BuilderFactory.startArray(); 388 reset(); 389 } 390 391 /** 392 * Allows the aggregation command can spill to disk. 393 * <p> 394 * This method delegates to {@link #allowDiskUsage(boolean) 395 * allowDiskUsage(true)}. 396 * </p> 397 * <p> 398 * This method also sets the builder to use a cursor to true. 399 * </p> 400 * 401 * @return This builder for chaining method calls. 402 */ 403 public Builder allowDiskUsage() { 404 return allowDiskUsage(true); 405 } 406 407 /** 408 * Sets to true if the aggregation command can spill to disk. 409 * <p> 410 * This method delegates to {@link #setAllowDiskUsage(boolean)}. 411 * </p> 412 * <p> 413 * This method also sets the builder to use a cursor to true. 414 * </p> 415 * 416 * @param allowDiskUsage 417 * The new value for if the aggregation command can spill to 418 * disk. 419 * @return This builder for chaining method calls. 420 */ 421 public Builder allowDiskUsage(final boolean allowDiskUsage) { 422 return setAllowDiskUsage(allowDiskUsage); 423 } 424 425 /** 426 * Sets the value of the number of documents to be returned in each 427 * batch. 428 * <p> 429 * This method delegates to {@link #setBatchSize(int)}. 430 * </p> 431 * <p> 432 * This method also sets the builder to use a cursor to true. 433 * </p> 434 * 435 * @param batchSize 436 * The new value for the number of documents to be returned 437 * in each batch. 438 * @return This builder for chaining method calls. 439 */ 440 public Builder batchSize(final int batchSize) { 441 return setBatchSize(batchSize); 442 } 443 444 /** 445 * Constructs a new {@link Aggregate} object from the state of the 446 * builder. 447 * 448 * @return The new {@link Aggregate} object. 449 */ 450 public Aggregate build() { 451 return new Aggregate(this); 452 } 453 454 /** 455 * Sets the value of the total number of documents to be returned. 456 * <p> 457 * This method delegates to {@link #setCusorLimit(int)}. 458 * </p> 459 * <p> 460 * This method also sets the builder to use a cursor to true. 461 * </p> 462 * 463 * @param limit 464 * The new value for the total number of documents to be 465 * returned. 466 * @return This builder for chaining method calls. 467 */ 468 public Builder cursorLimit(final int limit) { 469 return setCusorLimit(limit); 470 } 471 472 /** 473 * Adds a <tt>$geoNear</tt> operation to the pipeline to select 474 * documents for the aggregation pipeline based on their relative 475 * location to a set point. The <tt>$geoNear</tt> must be the first 476 * option in the aggregation pipeline. <blockquote> 477 * 478 * <pre> 479 * <code> 480 * import {@link AggregationGeoNear com.allanbank.mongodb.builder.AggregationGeoNear}; 481 * 482 * {@link Aggregate.Builder} builder = new Aggregation.Builder(); 483 * builder.geoNear( AggregationGeoNear.builder() 484 * .location( new Point( 1, 2 ) ) 485 * .distanceLocationField( "stats.distance" ) 486 * .limit( 5 ).build() ); 487 * </code> 488 * </pre> 489 * 490 * </blockquote> 491 * 492 * @param geoNear 493 * The options for the GeoNear operation. 494 * @return This builder for chaining method calls. 495 * 496 * @since MongoDB 2.4 497 */ 498 public Builder geoNear(final AggregationGeoNear geoNear) { 499 myRequiredVersion = Version.later(myRequiredVersion, 500 GEO_NEAR_REQUIRED_VERSION); 501 502 return step("$geoNear", geoNear.asDocument()); 503 } 504 505 /** 506 * Adds a <tt>$geoNear</tt> operation to the pipeline to select 507 * documents for the aggregation pipeline based on their relative 508 * location to a set point. The <tt>$geoNear</tt> must be the first 509 * option in the aggregation pipeline. <blockquote> 510 * 511 * <pre> 512 * <code> 513 * import {@link AggregationGeoNear com.allanbank.mongodb.builder.AggregationGeoNear}; 514 * 515 * {@link Aggregate.Builder} builder = new Aggregation.Builder(); 516 * builder.geoNear( AggregationGeoNear.builder() 517 * .location( new Point( 1, 2 ) ) 518 * .distanceLocationField( "stats.distance" ) 519 * .limit( 5 ) ); 520 * </code> 521 * </pre> 522 * 523 * </blockquote> 524 * 525 * @param geoNear 526 * The options for the GeoNear operation. 527 * @return This builder for chaining method calls. 528 * 529 * @since MongoDB 2.4 530 */ 531 public Builder geoNear(final AggregationGeoNear.Builder geoNear) { 532 return geoNear(geoNear.build()); 533 } 534 535 /** 536 * Adds a <tt>$group</tt> operation to the pipeline to aggregate 537 * documents passing this point in the pipeline into a group of 538 * documents. 539 * <p> 540 * This method is intended to construct groups with simple dynamic or 541 * static id documents. 542 * </p> 543 * <blockquote> 544 * 545 * <pre> 546 * <code> 547 * import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id}; 548 * import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set}; 549 * 550 * {@link Aggregate.Builder} builder = new Aggregation.Builder(); 551 * builder.group( 552 * id("$field1"), 553 * set("resultField1").uniqueValuesOf("$field2"), 554 * set("resultField2").max("$field3"), 555 * set("sum").sum("$field4") ); 556 * </code> 557 * </pre> 558 * 559 * </blockquote> 560 * 561 * @param id 562 * The builder for the <tt>_id</tt> field to specify unique 563 * groups. 564 * @param aggregations 565 * The specification for the group id and what fields to 566 * aggregate in the form of a document. 567 * @return This builder for chaining method calls. 568 */ 569 public Builder group(final AggregationGroupId id, 570 final AggregationGroupField... aggregations) { 571 572 final Element[] elements = new Element[aggregations.length + 1]; 573 elements[0] = id.toElement(); 574 for (int i = 0; i < aggregations.length; ++i) { 575 elements[i + 1] = aggregations[i].toElement(); 576 } 577 578 return step("$group", elements); 579 } 580 581 /** 582 * Adds a <tt>$group</tt> operation to the pipeline to aggregate 583 * documents passing this point in the pipeline into a group of 584 * documents. 585 * <p> 586 * This method is intended to construct groups with complex dynamic or 587 * static id documents. The {@link AggregationGroupId.Builder} 588 * implements the {@link DocumentBuilder} for construction of arbitrary 589 * complex _id documents. 590 * </p> 591 * <blockquote> 592 * 593 * <pre> 594 * <code> 595 * import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id}; 596 * import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set}; 597 * 598 * {@link Aggregate.Builder} builder = new Aggregation.Builder(); 599 * builder.group( 600 * id().addField("$field1").addField("$field2"), 601 * set("resultField1").uniqueValuesOf("$field3"), 602 * set("resultField2").first("$field4"), 603 * set("count").count() ); 604 * </code> 605 * </pre> 606 * 607 * </blockquote> 608 * 609 * @param id 610 * The builder for the <tt>_id</tt> field to specify unique 611 * groups. 612 * @param aggregations 613 * The specification for the group id and what fields to 614 * aggregate in the form of a document. 615 * @return This builder for chaining method calls. 616 */ 617 public Builder group(final AggregationGroupId.Builder id, 618 final AggregationGroupField... aggregations) { 619 return group(id.buildId(), aggregations); 620 } 621 622 /** 623 * Adds a <tt>$group</tt> operation to the pipeline to aggregate 624 * documents passing this point in the pipeline into a group of 625 * documents. 626 * 627 * @param aggregations 628 * The specification for the group id and what fields to 629 * aggregate in the form of a document. 630 * @return This builder for chaining method calls. 631 */ 632 public Builder group(final DocumentAssignable aggregations) { 633 return step("$group", aggregations); 634 } 635 636 /** 637 * Adds a <tt>$group</tt> operation to the pipeline to aggregate 638 * documents passing this point in the pipeline into a group of 639 * documents. 640 * <p> 641 * This method is intended to construct groups with complex dynamic or 642 * static id documents. The {@link AggregationGroupId.Builder} 643 * implements the {@link DocumentBuilder} for construction of arbitrary 644 * complex _id documents. 645 * </p> 646 * <blockquote> 647 * 648 * <pre> 649 * <code> 650 * import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id}; 651 * import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set}; 652 * 653 * {@link Aggregate.Builder} builder = new Aggregation.Builder(); 654 * builder.group( 655 * id().addInteger("i", 1), 656 * set("resultField1").uniqueValuesOf("$field3"), 657 * set("resultField2").first("$field4"), 658 * set("count").count() ); 659 * </code> 660 * </pre> 661 * 662 * </blockquote> 663 * 664 * @param id 665 * The builder for the <tt>_id</tt> field to specify unique 666 * groups. 667 * @param aggregations 668 * The specification for the group id and what fields to 669 * aggregate in the form of a document. 670 * @return This builder for chaining method calls. 671 */ 672 public Builder group(final DocumentAssignable id, 673 final AggregationGroupField... aggregations) { 674 return group(new AggregationGroupId(id), aggregations); 675 } 676 677 /** 678 * Adds a <tt>$limit</tt> operation to the pipeline to stop producing 679 * documents passing this point in the pipeline once the limit of 680 * documents is reached. 681 * 682 * @param numberOfDocuments 683 * The number of documents to allow past this point in the 684 * pipeline. 685 * @return This builder for chaining method calls. 686 * 687 * @see <a 688 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_limit">Aggregation 689 * Framework Operators - $limit</a> 690 */ 691 public Builder limit(final int numberOfDocuments) { 692 return step("$limit", numberOfDocuments); 693 } 694 695 /** 696 * Adds a <tt>$limit</tt> operation to the pipeline to stop producing 697 * documents passing this point in the pipeline once the limit of 698 * documents is reached. 699 * 700 * @param numberOfDocuments 701 * The number of documents to allow past this point in the 702 * pipeline. 703 * @return This builder for chaining method calls. 704 * 705 * @see <a 706 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_limit"> 707 * Aggregation Framework Operators - $limit</a> 708 */ 709 public Builder limit(final long numberOfDocuments) { 710 return step("$limit", numberOfDocuments); 711 } 712 713 /** 714 * Adds a <tt>$match</tt> operation to the pipeline to filter documents 715 * passing this point in the pipeline. 716 * <p> 717 * This method may be used with the {@link QueryBuilder} to easily 718 * specify the criteria to match against. <blockquote> 719 * 720 * <pre> 721 * <code> 722 * import static {@link QueryBuilder#where com.allanbank.mongodb.builder.QueryBuilder.where} 723 * 724 * Aggregation.Builder builder = new Aggregation.Builder(); 725 * 726 * builder.match( where("f").greaterThan(23).lessThan(42).and("g").lessThan(3) ); 727 * ... 728 * </code> 729 * </pre> 730 * 731 * </blockquote> 732 * </p> 733 * 734 * @param query 735 * The query to match documents against. 736 * @return This builder for chaining method calls. 737 * 738 * @see <a 739 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_match"> 740 * Aggregation Framework Operators - $match</a> 741 */ 742 public Builder match(final DocumentAssignable query) { 743 return step("$match", query); 744 } 745 746 /** 747 * Sets the maximum number of milliseconds to allow the command to run 748 * before aborting the request on the server. 749 * <p> 750 * This method equivalent to {@link #setMaximumTimeMilliseconds(long) 751 * setMaximumTimeMilliseconds(timeLimitUnits.toMillis(timeLimit)}. 752 * </p> 753 * 754 * @param timeLimit 755 * The new maximum amount of time to allow the command to 756 * run. 757 * @param timeLimitUnits 758 * The units for the maximum amount of time to allow the 759 * command to run. 760 * 761 * @return This {@link Builder} for method call chaining. 762 * 763 * @since MongoDB 2.6 764 */ 765 public Builder maximumTime(final long timeLimit, 766 final TimeUnit timeLimitUnits) { 767 return setMaximumTimeMilliseconds(timeLimitUnits 768 .toMillis(timeLimit)); 769 } 770 771 /** 772 * Adds a <tt>$out</tt> operation to the pipeline to write all of the 773 * output documents to the specified collection. 774 * <p> 775 * This method also forces the {@link ReadPreference} to be 776 * {@link ReadPreference#PRIMARY}. 777 * </p> 778 * 779 * @param collectionName 780 * The name of the collection to output the results to. 781 * @return This builder for chaining method calls. 782 * 783 * @see <a 784 * href="http://docs.mongodb.org/master/reference/operator/aggregation/out">Aggregation 785 * $out Operator</a> 786 */ 787 public Builder out(final String collectionName) { 788 setReadPreference(ReadPreference.PRIMARY); 789 return step("$out", collectionName); 790 } 791 792 /** 793 * Adds a <tt>$project</tt> operation to the pipeline to create a 794 * projection of the documents passing this point in the pipeline. 795 * <p> 796 * This method is intended to be used with the 797 * {@link AggregationProjectFields} and 798 * {@link com.allanbank.mongodb.builder.expression.Expressions 799 * Expressions} static helper methods. 800 * </p> 801 * <blockquote> 802 * 803 * <pre> 804 * <code> 805 * import static {@link AggregationProjectFields#include com.allanbank.mongodb.builder.AggregationProjectFields.include}; 806 * import static {@link com.allanbank.mongodb.builder.expression.Expressions com.allanbank.mongodb.builder.expression.Expressions.*}; 807 * 808 * 809 * Aggregation.Builder builder = new Aggregation.Builder(); 810 * ... 811 * builder.project( 812 * include("chr", "begin", "end", "calledPloidy"), 813 * set("window", 814 * multiply( 815 * divide( 816 * subtract( 817 * field("begin"), 818 * mod(field("begin"), constant(interval))), 819 * constant(interval)), 820 * constant(interval)))); 821 * ... 822 * </code> 823 * </pre> 824 * 825 * </blockquote> 826 * 827 * @param fields 828 * The fields to copy into the projected results. 829 * @param elements 830 * The computed elements based on {@link Expressions}. 831 * @return This builder for chaining method calls. 832 */ 833 public Builder project(final AggregationProjectFields fields, 834 final Element... elements) { 835 final List<IntegerElement> fieldElements = fields.toElements(); 836 837 final List<Element> allElements = new ArrayList<Element>( 838 fieldElements.size() + elements.length); 839 allElements.addAll(fieldElements); 840 allElements.addAll(Arrays.asList(elements)); 841 842 return step("$project", allElements); 843 } 844 845 /** 846 * Adds a <tt>$project</tt> operation to the pipeline to create a 847 * projection of the documents passing this point in the pipeline. 848 * 849 * @param projection 850 * The specification for the projection to perform. 851 * @return This builder for chaining method calls. 852 */ 853 public Builder project(final DocumentAssignable projection) { 854 return step("$project", projection); 855 } 856 857 /** 858 * Adds a <tt>$redact</tt> operation to potentially prune sub-documents 859 * from the results. 860 * 861 * @param ifExpression 862 * The expression to evaluate to determine if the current 863 * sub-document should be pruned or not. 864 * @param thenOption 865 * Operation to apply if the {@code ifExpression} evaluates 866 * to true. 867 * @param elseOption 868 * Operation to apply if the {@code ifExpression} evaluates 869 * to false. 870 * @return This builder for chaining method calls. 871 */ 872 public Builder redact(final DocumentAssignable ifExpression, 873 final RedactOption thenOption, final RedactOption elseOption) { 874 myRequiredVersion = Version.later(myRequiredVersion, 875 REDACT_REQUIRED_VERSION); 876 877 final DocumentBuilder doc = BuilderFactory.start(); 878 doc.push(Expressions.CONDITION) 879 .add(new DocumentElement("if", ifExpression.asDocument())) 880 .add("then", thenOption.getToken()) 881 .add("else", elseOption.getToken()); 882 883 return step("$redact", doc); 884 } 885 886 /** 887 * Adds a <tt>$redact</tt> operation to potentially prune sub-documents 888 * from the results. 889 * 890 * @param ifExpression 891 * The expression to evaluate to determine if the current 892 * sub-document should be pruned or not. 893 * @param thenOption 894 * Operation to apply if the {@code ifExpression} evaluates 895 * to true. 896 * @param elseOption 897 * Operation to apply if the {@code ifExpression} evaluates 898 * to false. 899 * @return This builder for chaining method calls. 900 */ 901 public Builder redact(final Expression ifExpression, 902 final RedactOption thenOption, final RedactOption elseOption) { 903 myRequiredVersion = Version.later(myRequiredVersion, 904 REDACT_REQUIRED_VERSION); 905 906 final DocumentBuilder doc = BuilderFactory.start(); 907 doc.push(Expressions.CONDITION).add(ifExpression.toElement("if")) 908 .add("then", thenOption.getToken()) 909 .add("else", elseOption.getToken()); 910 911 return step("$redact", doc); 912 } 913 914 /** 915 * Resets the builder back to an empty pipeline. 916 * 917 * @return This builder for chaining method calls. 918 */ 919 public Builder reset() { 920 myPipeline.reset(); 921 myReadPreference = null; 922 myMaximumTimeMilliseconds = 0; 923 myBatchSize = 0; 924 myLimit = 0; 925 myUseCursor = false; 926 myAllowDiskUsage = false; 927 myRequiredVersion = REQUIRED_VERSION; 928 929 return this; 930 } 931 932 /** 933 * Sets to true if the aggregation command can spill to disk. 934 * <p> 935 * This method also sets the builder to use a cursor to true. 936 * </p> 937 * 938 * @param allowDiskUsage 939 * The new value for if the aggregation command can spill to 940 * disk. 941 * @return This builder for chaining method calls. 942 */ 943 public Builder setAllowDiskUsage(final boolean allowDiskUsage) { 944 myRequiredVersion = Version.later(myRequiredVersion, 945 ALLOW_DISK_USAGE_REQUIRED_VERSION); 946 947 myAllowDiskUsage = allowDiskUsage; 948 return setUseCursor(true); 949 } 950 951 /** 952 * Sets the value of the number of documents to be returned in each 953 * batch. 954 * <p> 955 * This method also sets the builder to use a cursor to true. 956 * </p> 957 * 958 * @param batchSize 959 * The new value for the number of documents to be returned 960 * in each batch. 961 * @return This builder for chaining method calls. 962 */ 963 public Builder setBatchSize(final int batchSize) { 964 myBatchSize = batchSize; 965 return setUseCursor(true); 966 } 967 968 /** 969 * Sets the value of the total number of documents to be returned. 970 * <p> 971 * This method also sets the builder to use a cursor to true. 972 * </p> 973 * 974 * @param limit 975 * The new value for the total number of documents to be 976 * returned. 977 * @return This builder for chaining method calls. 978 */ 979 public Builder setCusorLimit(final int limit) { 980 myLimit = limit; 981 return setUseCursor(true); 982 } 983 984 /** 985 * Sets the maximum number of milliseconds to allow the command to run 986 * before aborting the request on the server. 987 * 988 * @param maximumTimeMilliseconds 989 * The new maximum number of milliseconds to allow the 990 * command to run. 991 * @return This {@link Builder} for method call chaining. 992 * 993 * @since MongoDB 2.6 994 */ 995 public Builder setMaximumTimeMilliseconds( 996 final long maximumTimeMilliseconds) { 997 myRequiredVersion = Version.later(myRequiredVersion, 998 MAX_TIMEOUT_VERSION); 999 1000 myMaximumTimeMilliseconds = maximumTimeMilliseconds; 1001 return this; 1002 } 1003 1004 /** 1005 * Sets the {@link ReadPreference} specifying which servers may be used 1006 * to execute the aggregation. 1007 * <p> 1008 * If not set or set to <code>null</code> then the 1009 * {@link MongoCollection} instance's {@link ReadPreference} will be 1010 * used. 1011 * </p> 1012 * 1013 * @param readPreference 1014 * The read preferences specifying which servers may be used. 1015 * @return This builder for chaining method calls. 1016 * 1017 * @see MongoCollection#getReadPreference() 1018 */ 1019 public Builder setReadPreference(final ReadPreference readPreference) { 1020 myReadPreference = readPreference; 1021 return this; 1022 } 1023 1024 /** 1025 * Sets to true if the aggregation results should be returned as a 1026 * cursor. 1027 * 1028 * @param useCursor 1029 * The new value for if the results should be returned via a 1030 * cursor. 1031 * @return This builder for chaining method calls. 1032 */ 1033 public Builder setUseCursor(final boolean useCursor) { 1034 myRequiredVersion = Version 1035 .later(myRequiredVersion, CURSOR_VERSION); 1036 myUseCursor = useCursor; 1037 return this; 1038 } 1039 1040 /** 1041 * Adds a <tt>$skip</tt> operation to the pipeline to skip the specified 1042 * number of documents before allowing any document past this point in 1043 * the pipeline. 1044 * 1045 * @param numberOfDocuments 1046 * The number of documents to skip past before allowing any 1047 * documents to pass this point in the pipeline. 1048 * @return This builder for chaining method calls. 1049 * 1050 * @see <a 1051 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_skip"> 1052 * Aggregation Framework Operators - $skip</a> 1053 */ 1054 public Builder skip(final int numberOfDocuments) { 1055 return step("$skip", numberOfDocuments); 1056 } 1057 1058 /** 1059 * Adds a <tt>$skip</tt> operation to the pipeline to skip the specified 1060 * number of documents before allowing any document past this point in 1061 * the pipeline. 1062 * 1063 * @param numberOfDocuments 1064 * The number of documents to skip past before allowing any 1065 * documents to pass this point in the pipeline. 1066 * @return This builder for chaining method calls. 1067 * 1068 * @see <a 1069 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_skip"> 1070 * Aggregation Framework Operators - $skip</a> 1071 */ 1072 public Builder skip(final long numberOfDocuments) { 1073 return step("$skip", numberOfDocuments); 1074 } 1075 1076 /** 1077 * Adds a <tt>$sort</tt> operation to sort the documents passing this 1078 * point based on the sort specification provided. 1079 * <p> 1080 * This method is intended to be used with the {@link Sort} class's 1081 * static methods: <blockquote> 1082 * 1083 * <pre> 1084 * <code> 1085 * import static {@link Sort#asc(String) com.allanbank.mongodb.builder.Sort.asc}; 1086 * import static {@link Sort#desc(String) com.allanbank.mongodb.builder.Sort.desc}; 1087 * 1088 * Aggregation.Builder builder = new Aggregation.Builder(); 1089 * 1090 * builder.setSort( asc("f"), desc("g") ); 1091 * ... 1092 * </code> 1093 * </pre> 1094 * 1095 * </blockquote> 1096 * 1097 * @param sortFields 1098 * The sort fields to use. 1099 * @return This builder for chaining method calls. 1100 * 1101 * @see <a 1102 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_sort"> 1103 * Aggregation Framework Operators - $sort</a> 1104 */ 1105 public Builder sort(final IntegerElement... sortFields) { 1106 return step("$sort", sortFields); 1107 } 1108 1109 /** 1110 * Adds a <tt>$sort</tt> operation to sort the documents passing this 1111 * point based on the sort fields provides in ascending order. 1112 * 1113 * @param sortFields 1114 * The sort fields to use in ascending order. 1115 * @return This builder for chaining method calls. 1116 * 1117 * @see <a 1118 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_sort"> 1119 * Aggregation Framework Operators - $sort</a> 1120 */ 1121 public Builder sort(final String... sortFields) { 1122 final IntegerElement[] elements = new IntegerElement[sortFields.length]; 1123 for (int i = 0; i < sortFields.length; ++i) { 1124 elements[i] = Sort.asc(sortFields[i]); 1125 } 1126 return sort(elements); 1127 } 1128 1129 /** 1130 * Adds a generic step to the builder's pipeline. 1131 * 1132 * @param operator 1133 * The operator to add to the pipeline. 1134 * @param stepDocument 1135 * The document containing the details of the step to apply. 1136 * @return This builder for chaining method calls. 1137 */ 1138 public Builder step(final String operator, 1139 final DocumentAssignable stepDocument) { 1140 myPipeline.push().addDocument(operator, stepDocument.asDocument()); 1141 return this; 1142 } 1143 1144 /** 1145 * Adds a generic step to the builder's pipeline. 1146 * 1147 * @param operator 1148 * The operator to add to the pipeline. 1149 * @param value 1150 * The value for the operator. 1151 * @return This builder for chaining method calls. 1152 */ 1153 public Builder step(final String operator, final double value) { 1154 myPipeline.push().addDouble(operator, value); 1155 return this; 1156 } 1157 1158 /** 1159 * Adds a generic step to the builder's pipeline. 1160 * 1161 * @param operator 1162 * The operator to add to the pipeline. 1163 * @param elements 1164 * The elements containing the details of the step to apply. 1165 * @return This builder for chaining method calls. 1166 */ 1167 public Builder step(final String operator, final Element... elements) { 1168 return step(operator, Arrays.asList(elements)); 1169 } 1170 1171 /** 1172 * Adds a generic step to the builder's pipeline. 1173 * 1174 * @param operator 1175 * The operator to add to the pipeline. 1176 * @param value 1177 * The value for the operator. 1178 * @return This builder for chaining method calls. 1179 */ 1180 public Builder step(final String operator, final int value) { 1181 myPipeline.push().addInteger(operator, value); 1182 return this; 1183 } 1184 1185 /** 1186 * Adds a generic step to the builder's pipeline. 1187 * 1188 * @param operator 1189 * The operator to add to the pipeline. 1190 * @param elements 1191 * The elements containing the details of the step to apply. 1192 * @return This builder for chaining method calls. 1193 */ 1194 public Builder step(final String operator, final List<Element> elements) { 1195 final DocumentBuilder operatorBuilder = myPipeline.push().push( 1196 operator); 1197 for (final Element element : elements) { 1198 operatorBuilder.add(element); 1199 } 1200 return this; 1201 } 1202 1203 /** 1204 * Adds a generic step to the builder's pipeline. 1205 * 1206 * @param operator 1207 * The operator to add to the pipeline. 1208 * @param value 1209 * The value for the operator. 1210 * @return This builder for chaining method calls. 1211 */ 1212 public Builder step(final String operator, final long value) { 1213 myPipeline.push().addLong(operator, value); 1214 return this; 1215 } 1216 1217 /** 1218 * Adds a generic step to the builder's pipeline. 1219 * 1220 * @param operator 1221 * The operator to add to the pipeline. 1222 * @param value 1223 * The value for the operator. 1224 * @return This builder for chaining method calls. 1225 */ 1226 public Builder step(final String operator, final String value) { 1227 myPipeline.push().addString(operator, value); 1228 return this; 1229 } 1230 1231 /** 1232 * Return the JSON for the current pipeline that would be constructed by 1233 * the builder. 1234 */ 1235 @Override 1236 public String toString() { 1237 return new ArrayElement("$pipeline", build().getPipeline()) 1238 .toString(); 1239 } 1240 1241 /** 1242 * Adds a <tt>$unwind</tt> operation generate a document for each 1243 * element of the specified array field with the array replaced with the 1244 * value of the element. 1245 * 1246 * @param fieldName 1247 * The name of the array field within the document to unwind. 1248 * This name must start with a '$'. If it does not a '$' will 1249 * be prepended to the field name.. 1250 * @return This builder for chaining method calls. 1251 * 1252 * @see <a 1253 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_unwind"> 1254 * Aggregation Framework Operators - $unwind</a> 1255 */ 1256 public Builder unwind(final String fieldName) { 1257 if (fieldName.startsWith("$")) { 1258 step("$unwind", fieldName); 1259 } 1260 else { 1261 step("$unwind", "$" + fieldName); 1262 } 1263 return this; 1264 } 1265 1266 /** 1267 * Sets that the results should be returned using a cursor. 1268 * <p> 1269 * This method delegates to {@link #setUseCursor(boolean) 1270 * setUseCursor(true)}. 1271 * </p> 1272 * 1273 * @return This builder for chaining method calls. 1274 */ 1275 public Builder useCursor() { 1276 return setUseCursor(true); 1277 } 1278 1279 /** 1280 * Sets to true if the aggregation results should be returned as a 1281 * cursor. 1282 * <p> 1283 * This method delegates to {@link #setUseCursor(boolean)}. 1284 * </p> 1285 * 1286 * @param useCursor 1287 * The new value for if the results should be returned via a 1288 * cursor. 1289 * @return This builder for chaining method calls. 1290 */ 1291 public Builder useCursor(final boolean useCursor) { 1292 return setUseCursor(useCursor); 1293 } 1294 } 1295 }