Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to trap failures #4

Closed
Gauravshah opened this issue Jun 25, 2015 · 11 comments
Closed

Unable to trap failures #4

Gauravshah opened this issue Jun 25, 2015 · 11 comments

Comments

@Gauravshah
Copy link

I am trying to use the jar as a library and I am unable to trap any errors.

I am using the following class

        class KinesisFutureCallback
          include FutureCallback
          def initialize logger
            @logger = logger
          end
          def onFailure(e)
            puts "failure"
            @logger.error "Failed in publishing event to Kinesis"
            @logger.error e
          end

          def onSuccess(r)
            puts "success"
            @logger.info "Published event to kinesis #{r}"
          end
        end
@kinesis_callback = KinesisFutureCallback.new(@logger)
            Futures.addCallback(
                @kinesis_producer.addUserRecord(
                    @kinesis_stream_name,
                    Time.now.to_i.to_s,
                    ByteBuffer.wrap({:foo => :bar}.to_json.to_java_bytes)
                ),
                @kinesis_callback)

I am able to trap success. But even if I try to turn off my internet connection I do not end up in the failure block. The library auto-retries. Also if I send a STOP signal to the application it does not terminate at that point. It keeps retrying.

Environment:

  • JRuby 1.7.12 , 1.9 mode
  • JDK "1.8.0_40-ea"
  • Mac OSX 10.10.3
  • amazon-kinesis-producer-0.9.0.jar
  • protobuf-java-2.6.1.jar
  • guava-18.0.jar
  • commons-lang-2.6.jar
  • commons-compress-1.9.jar
  • commons-io-2.4.jar
  • slf4j-simple-1.7.12.jar
  • slf4j-api-1.7.12.jar
@kevincdeng
Copy link
Contributor

Hi Gaurav,

By STOP signal do you mean sending SIGSTOP to the process? SIGSTOP does not (cause the OS to) terminate processes. Have you tried using SIGTERM or SIGKILL?

I'm looking into the first issue where loss of connectivity does not cause requests to fail.

@Gauravshah
Copy link
Author

Hi Kevin,

I meant SIGTERM . So lets say that the buffer is empty, sending SIGTERM terminates application gracefully. But lets say the internet was off and it had some data in the buffer then SIGTERM doesn't terminate the program.

@kevincdeng
Copy link
Contributor

Thanks Gaurav. I'll try to reproduce this. If confirmed we'll fix it in an upcoming release.

@Gauravshah
Copy link
Author

Let me know if I can help with some detailed steps or a demo. Thanks Kevin

@kevincdeng
Copy link
Contributor

Hi Gaurav,

The 0.10.0 release is now available and should address these issues. Let me know how it goes when you get a chance to try it out.

@Gauravshah
Copy link
Author

sure will let you know in 3 days. Thanks @kevincdeng

@Gauravshah
Copy link
Author

I am able trap failures now, Thanks @kevincdeng . It gives me a an instance of UserRecordResult which has a list of Attempt . Is there any way I can the message string that was attempted ? So that we have a fallback if something had failed.

@kevincdeng
Copy link
Contributor

Do you mean the actual payload of the record? The UserRecordResult object does not carry that at the moment. To have access to it you can bind the String that is used as the payload to the callback, either by saving it in instances of an explicit class that implements FutureCallback, or capturing it in a closure. Here's some sample code for the latter:

String partitionKey = ...
String payload =  ...
ByteBuffer data = ByteBuffer.wrap(payload.getBytes("UTF-8"));

ListenableFuture<UserRecordResult> f =
        kinesis.addUserRecord(STREAM_NAME, partitionKey, data);
Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
    @Override
    public void onSuccess(UserRecordResult result) {
        long totalTime = result.getAttempts().stream()
                .mapToLong(a -> a.getDelay() + a.getDuration())
                .sum();
        // Only log with a small probability, otherwise it'll be very spammy
        if (RANDOM.nextDouble() < 1e-5) {
            log.info(String.format(
                    "Succesfully put record, partitionKey=%s, payload=%s, " +
                    "sequenceNumber=%s, shardId=%s, took %d attempts, totalling %s ms",
                    partitionKey, payload, result.getSequenceNumber(),
                    result.getShardId(), result.getAttempts().size(), totalTime));
        }
    }

    @Override
    public void onFailure(Throwable t) {
        ...
    };
});

For Java 7 the captured variables have to be marked final.

@Gauravshah
Copy link
Author

Yes, I am referring to the payload of data. Thanks for the code sample. But that would not be possible in production environment since I am using only one object of FutureCallback over all the events, is that not the recommended approach ?

@kevincdeng
Copy link
Contributor

That depends. If your callbacks do not have state then a singleton is definitely fine. On the other hand it's also perfectly reasonable to have state in the callbacks. Since in this case we want to save some context in the callback, we'd want to go with the latter.

If you don't want to write the callback code inline (to capture the context automatically in the closure), you can always create a class that implements FutureCallback<UserRecordResult>. This class can take the context you want to save in its constructor (or factory method) and save it into an instance variable. You'd then create a new instance of the class per user record.

@Gauravshah
Copy link
Author

Thanks for the explanation @kevincdeng . Will do that for now. Will still love to have ability to get payload from UserRecordResult some time in future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants