feat: add max/min throttling options to BulkWriterOptions#400
feat: add max/min throttling options to BulkWriterOptions#400thebrianchen merged 6 commits intomasterfrom
Conversation
| // The initial validation step ensures that the maxOpsPerSecond is | ||
| // greater than initialOpsPerSecond. If this inequality is true, that | ||
| // means initialOpsPerSecond was not set and maxOpsPerSecond is less | ||
| // than the default starting rate. |
There was a problem hiding this comment.
These comments need some reflowing to match the Java line length.
|
|
||
| if (initialRate < 1) { | ||
| throw FirestoreException.invalidState( | ||
| "Value for argument 'initialOpsPerSecond' must be an integer within [1, Infinity], but was: " |
There was a problem hiding this comment.
| "Value for argument 'initialOpsPerSecond' must be an integer within [1, Infinity], but was: " | |
| "Value for argument 'initialOpsPerSecond' must be greater than 1, but was: " |
Easier to read, me thinks.
| if (maxRate < 1) { | ||
| throw FirestoreException.invalidState( | ||
| "Value for argument 'maxOpsPerSecond' must be an integer within [1, Infinity], but was: " | ||
| + (int) maxRate); |
| final class BulkWriterOptions { | ||
| static final double DEFAULT_UNSET_VALUE = 1.1; | ||
| private final boolean throttling; | ||
| private double initialOpsPerSecond = DEFAULT_UNSET_VALUE; |
There was a problem hiding this comment.
Can we make these Double and initialize to null?
There was a problem hiding this comment.
Java doesn't allow setting Double to null, but I used Double.NaN.
| * @return The BulkWriterOptions object. | ||
| */ | ||
| @Nonnull | ||
| public static BulkWriterOptions withInitialOpsPerSecondThrottling(int initialOpsPerSecond) { |
There was a problem hiding this comment.
| public static BulkWriterOptions withInitialOpsPerSecondThrottling(int initialOpsPerSecond) { | |
| public static BulkWriterOptions withInitialOpsPerSecond(double initialOpsPerSecond) { |
| * The throttler's allowed operations per second does not ramp up past the specified | ||
| * operations per second. |
There was a problem hiding this comment.
This seems to follow from line above.
| private double maxOpsPerSecond = DEFAULT_UNSET_VALUE; | ||
|
|
||
| private final boolean enableThrottling; | ||
| BulkWriterOptions(boolean enableThrottling) { |
There was a problem hiding this comment.
Does this need to be package-private now?
| @@ -20,15 +20,31 @@ | |||
|
|
|||
| /** Options used to disable request throttling in BulkWriter. */ | |||
| final class BulkWriterOptions { | |||
There was a problem hiding this comment.
I think it might be time to make this an actual Builder. You can use AutoValue to reduce the amount of code you have to write. See
| @Nonnull | ||
| BulkWriter bulkWriter() { | ||
| return new BulkWriter(this, /* enableThrottling= */ true); | ||
| return new BulkWriter(this, new BulkWriterOptions(true)); |
There was a problem hiding this comment.
Add back /* enableThrottling= */ ?
| * @param initialCapacity Initial maximum number of operations per second. | ||
| * @param multiplier Rate by which to increase the capacity. | ||
| * @param multiplierMillis How often the capacity should increase in milliseconds. | ||
| * @param maximumCapacity Maximum number of allowed operations per second. The number of tokens |
There was a problem hiding this comment.
Isn't this different from the capacity? From what I can tell, the capacity grows unbounded and the rate is reduced artificially. The implementation is fine, but the name of the setting is a bit misleading.
There was a problem hiding this comment.
When the capacity is calculated, it's capped at the maximumCapacity, which I thought is short for "maximum allowed capacity".
When a new request is made, the number of tokens to allocate is done in calculateCapacity, which performs the Math.min() operation. This means that the capacity is bounded by maximumCapacity.
There was a problem hiding this comment.
I think in your implementation the total capacity can grow beyond maximumCapacity, but the rate at which tokens get deducted is limited. This implementation is correct, but the value that is passed here is not the maximum capacity, but rather the limit of tokens used at a time. e.g. after 5 seconds without requests the total capacity of the throttler could be 500, even if maximumCapacity is 100.
As stated, this is just a naming nit.
There was a problem hiding this comment.
Thanks for elaborating! Renamed to maximumRate.
Codecov Report
@@ Coverage Diff @@
## master #400 +/- ##
============================================
+ Coverage 72.40% 72.52% +0.11%
- Complexity 1043 1112 +69
============================================
Files 64 68 +4
Lines 5578 5910 +332
Branches 689 768 +79
============================================
+ Hits 4039 4286 +247
- Misses 1323 1370 +47
- Partials 216 254 +38
Continue to review full report at Codecov.
|
schmidt-sebastian
left a comment
There was a problem hiding this comment.
Please replace double with a nullable Double, which allows you to remove your Double.NaN checks. Otherwise, this looks good. Thanks!
| } else { | ||
| rateLimiter = new RateLimiter(Integer.MAX_VALUE, Double.MAX_VALUE, Integer.MAX_VALUE); | ||
| double startingRate = DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND; | ||
| double maxRate = Integer.MAX_VALUE; |
There was a problem hiding this comment.
This seems strange. Should we use Double.POSITIVE_INFINITY?
| double startingRate = DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND; | ||
| double maxRate = Integer.MAX_VALUE; | ||
|
|
||
| if (!Double.isNaN(options.getInitialOpsPerSecond())) { |
There was a problem hiding this comment.
Please use nullable values (Double instead of double) and use a null-check.
There was a problem hiding this comment.
Done, but had to modify the AutoValue setters to support passing in int arguments.
| private void validateBulkWriterOptions(BulkWriterOptions options) { | ||
| double initialRate = options.getInitialOpsPerSecond(); | ||
| double maxRate = options.getMaxOpsPerSecond(); | ||
|
|
||
| if (initialRate < 1) { | ||
| throw FirestoreException.invalidState( | ||
| "Value for argument 'initialOpsPerSecond' must be greater than 1, but was: " | ||
| + (int) initialRate); | ||
| } | ||
|
|
||
| if (maxRate < 1) { | ||
| throw FirestoreException.invalidState( | ||
| "Value for argument 'maxOpsPerSecond' must be greater than 1, but was: " + (int) maxRate); | ||
| } | ||
|
|
||
| if (!Double.isNaN(initialRate) && !Double.isNaN(maxRate) && initialRate > maxRate) { | ||
| throw FirestoreException.invalidState( | ||
| "'maxOpsPerSecond' cannot be less than 'initialOpsPerSecond'."); | ||
| } | ||
|
|
||
| if (!options.getThrottlingEnabled() && (!Double.isNaN(initialRate) || !Double.isNaN(maxRate))) { | ||
| throw FirestoreException.invalidState( | ||
| "Cannot set 'initialRate' or 'maxRate' when 'throttlingEnabled' is set to false."); | ||
| } | ||
| } |
There was a problem hiding this comment.
Ideally, this validation would happen in the BulkWriterOptions.build() step. This would follow the precedent in other builders.
This might help: https://github.com/google/auto/blob/master/value/userguide/builders-howto.md#validate
| * @param initialCapacity Initial maximum number of operations per second. | ||
| * @param multiplier Rate by which to increase the capacity. | ||
| * @param multiplierMillis How often the capacity should increase in milliseconds. | ||
| * @param maximumCapacity Maximum number of allowed operations per second. The number of tokens |
There was a problem hiding this comment.
I think in your implementation the total capacity can grow beyond maximumCapacity, but the rate at which tokens get deducted is limited. This implementation is correct, but the value that is passed here is not the maximum capacity, but rather the limit of tokens used at a time. e.g. after 5 seconds without requests the total capacity of the throttler could be 500, even if maximumCapacity is 100.
As stated, this is just a naming nit.
🤖 I have created a release \*beep\* \*boop\* --- ## [2.2.0](https://www.github.com/googleapis/java-firestore/compare/v2.1.0...v2.2.0) (2021-01-20) ### Features * Add bundle proto building ([#271](https://www.github.com/googleapis/java-firestore/issues/271)) ([994835c](https://www.github.com/googleapis/java-firestore/commit/994835c0a3be077404afa60abd4d4685d17fb533)) * add bundle.proto from googleapis/googleapis ([#407](https://www.github.com/googleapis/java-firestore/issues/407)) ([37da386](https://www.github.com/googleapis/java-firestore/commit/37da386875d1b65121e8a9a92b1a000537f07625)) * add CollectionGroup#getPartitions(long) ([#478](https://www.github.com/googleapis/java-firestore/issues/478)) ([bab064e](https://www.github.com/googleapis/java-firestore/commit/bab064edde26325bf0041ffe28d4c63b97a089c5)) * add implicit ordering for startAt(DocumentReference) calls ([#417](https://www.github.com/googleapis/java-firestore/issues/417)) ([aae6dc9](https://www.github.com/googleapis/java-firestore/commit/aae6dc960f7c42830ceed23c65acaad3e457dcff)) * add max/min throttling options to BulkWriterOptions ([#400](https://www.github.com/googleapis/java-firestore/issues/400)) ([27a9397](https://www.github.com/googleapis/java-firestore/commit/27a9397f67e151d723241c80ccb2ec9f1bfbba1c)) * add success and error callbacks to BulkWriter ([#483](https://www.github.com/googleapis/java-firestore/issues/483)) ([3c05037](https://www.github.com/googleapis/java-firestore/commit/3c05037e8fce8d3ce4907fde85bd254fc98ea588)) * Implementation of Firestore Bundle Builder ([#293](https://www.github.com/googleapis/java-firestore/issues/293)) ([fd5ef90](https://www.github.com/googleapis/java-firestore/commit/fd5ef90b6681cc67aeee6c95f3de80267798dcd0)) * Release bundles ([#466](https://www.github.com/googleapis/java-firestore/issues/466)) ([3af065e](https://www.github.com/googleapis/java-firestore/commit/3af065e32b193931c805b576f410ad90124b43a7)) ### Bug Fixes * add @BetaApi, make BulkWriter public, and refactor Executor ([#497](https://www.github.com/googleapis/java-firestore/issues/497)) ([27ff9f6](https://www.github.com/googleapis/java-firestore/commit/27ff9f6dfa92cac9119d2014c24a0759baa44fb7)) * **build:** sample checkstyle violations ([#457](https://www.github.com/googleapis/java-firestore/issues/457)) ([777ecab](https://www.github.com/googleapis/java-firestore/commit/777ecabd1ce12cbc5f4169de6c23a90f98deac06)) * bulkWriter: writing to the same doc doesn't create a new batch ([#394](https://www.github.com/googleapis/java-firestore/issues/394)) ([259ece8](https://www.github.com/googleapis/java-firestore/commit/259ece8511db71ea79cc1a080eb785a15db88756)) * empty commit to trigger release-please ([fcef0d3](https://www.github.com/googleapis/java-firestore/commit/fcef0d302cd0a9339d82db73152289d6f9f67ff2)) * make BulkWriterOptions public ([#502](https://www.github.com/googleapis/java-firestore/issues/502)) ([6ea05be](https://www.github.com/googleapis/java-firestore/commit/6ea05beb3f27337bef910ca64f0e3f32de6b7e98)) * retry Query streams ([#426](https://www.github.com/googleapis/java-firestore/issues/426)) ([3513cd3](https://www.github.com/googleapis/java-firestore/commit/3513cd39ff43d26c8432c05ce20693350539ae8f)) * retry transactions that fail with expired transaction IDs ([#447](https://www.github.com/googleapis/java-firestore/issues/447)) ([5905438](https://www.github.com/googleapis/java-firestore/commit/5905438af6501353e978210808834a26947aae95)) * verify partition count before invoking GetPartition RPC ([#418](https://www.github.com/googleapis/java-firestore/issues/418)) ([2054ae9](https://www.github.com/googleapis/java-firestore/commit/2054ae971083277e1cf81c2b57500c40a6faa0ef)) ### Documentation * **sample:** normalize firestore sample's region tags ([#453](https://www.github.com/googleapis/java-firestore/issues/453)) ([b529245](https://www.github.com/googleapis/java-firestore/commit/b529245c75f770e1b47ca5d9561bab55a7610677)) ### Dependencies * remove explicit version for jackson ([#479](https://www.github.com/googleapis/java-firestore/issues/479)) ([e2aecfe](https://www.github.com/googleapis/java-firestore/commit/e2aecfec51465b8fb3413337a76f9a3de57b8500)) * update dependency com.google.cloud:google-cloud-conformance-tests to v0.0.12 ([#367](https://www.github.com/googleapis/java-firestore/issues/367)) ([2bdd846](https://www.github.com/googleapis/java-firestore/commit/2bdd84693bbd968cafabd0e7ee56d1a9a7dc31ca)) * update dependency com.google.cloud:google-cloud-conformance-tests to v0.0.13 ([#411](https://www.github.com/googleapis/java-firestore/issues/411)) ([e6157b5](https://www.github.com/googleapis/java-firestore/commit/e6157b5cb532e0184125355b12115058e72afa67)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.10.0 ([#383](https://www.github.com/googleapis/java-firestore/issues/383)) ([cb39ee8](https://www.github.com/googleapis/java-firestore/commit/cb39ee820c2f67e22da623f5a6eaa7ee6bf351e2)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.10.2 ([#403](https://www.github.com/googleapis/java-firestore/issues/403)) ([991dd81](https://www.github.com/googleapis/java-firestore/commit/991dd810360e654fca0b53e0611da0cd77febc7c)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.12.1 ([#425](https://www.github.com/googleapis/java-firestore/issues/425)) ([b897ffa](https://www.github.com/googleapis/java-firestore/commit/b897ffa90427a8f597c02c24f80d1d162be48b23)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.13.0 ([#430](https://www.github.com/googleapis/java-firestore/issues/430)) ([0f8f218](https://www.github.com/googleapis/java-firestore/commit/0f8f218678c3ddebb73748c382cab8e38c2f140d)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.14.1 ([#446](https://www.github.com/googleapis/java-firestore/issues/446)) ([e241f8e](https://www.github.com/googleapis/java-firestore/commit/e241f8ebbfdf202f00424177c69962311b37fc88)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.15.0 ([#460](https://www.github.com/googleapis/java-firestore/issues/460)) ([b82fc35](https://www.github.com/googleapis/java-firestore/commit/b82fc3561d1a397438829ab69df24141481369a2)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.16.0 ([#481](https://www.github.com/googleapis/java-firestore/issues/481)) ([ae98824](https://www.github.com/googleapis/java-firestore/commit/ae988245e6d6391c85414e9b6f7ae1b8148c3a6d)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.16.1 ([4ace93c](https://www.github.com/googleapis/java-firestore/commit/4ace93c7be580a8f7870e71cad2dc19bb5fdef29)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.17.0 ([#487](https://www.github.com/googleapis/java-firestore/issues/487)) ([e11e472](https://www.github.com/googleapis/java-firestore/commit/e11e4723bc75727086bee0436492f458def29ff5)) * update dependency com.google.cloud:google-cloud-shared-dependencies to v0.18.0 ([#495](https://www.github.com/googleapis/java-firestore/issues/495)) ([f78720a](https://www.github.com/googleapis/java-firestore/commit/f78720a155f1294321f05266b9a546bbf2cb9a04)) * update jackson dependencies to v2.11.3 ([#396](https://www.github.com/googleapis/java-firestore/issues/396)) ([2e176e2](https://www.github.com/googleapis/java-firestore/commit/2e176e2f864262f31e6f93705fa7e794023b9649)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Porting throttling options and rate limiter test fix from node.