Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream keys on pbc issues, with early close of the stream [JIRA: CLIENTS-1051] #507

Open
Guibod opened this issue Nov 29, 2016 · 3 comments

Comments

@Guibod
Copy link

Guibod commented Nov 29, 2016

If you close a stream before it is finished, then it still try to drain an opened socket with incoming data.
Maybe you should close the resource instead of releasing it in such a case.

    def close(self):
        # We have to drain the socket to make sure that we don't get
        # weird responses when some other request comes after a
        # failed/prematurely-terminated one.
        try:
            while self.next():
                pass
        except StopIteration:
            pass
        self.resource.release()

Of course you cannot achieve that with the client wrapper that yields results through an iterator, and is not closeable.

@Basho-JIRA Basho-JIRA changed the title Stream keys on pbc issues, with early close of the stream Stream keys on pbc issues, with early close of the stream [JIRA: CLIENTS-1051] Nov 29, 2016
@Guibod
Copy link
Author

Guibod commented Nov 29, 2016

I also suggest that you update documentation. Because the stream_keys() wrapper cannot be manually closed as stated in documentation. The wrapper try/finally for us.

@Guibod
Copy link
Author

Guibod commented Nov 29, 2016

I've solved, by deleting my resource

   # obviously tthis is a method in a class with self.riak = client, and self.bucket, the bucket
    def stream(self, limit=10):
        i = 0
        resource = self.riak._acquire()
        transport = resource.object
        stream = transport.stream_keys(self.bucket, timeout=timeout)
        stream.attach(resource)

        try:
            for keylist in stream:
                for key in keylist:
                    key = bytes_to_str(key)
                    i += 1
                    if limit and i > limit:
                        logger.debug('Stream limit reached (%d)' % limit)
                        raise StopIteration
                    yield self.bucket.get(key)
        finally:
            self.riak._choose_pool().delete_resource(stream.resource)

@lukebakken lukebakken added this to the riak-python-client-2.7.0 milestone Nov 29, 2016
@lukebakken lukebakken self-assigned this Nov 29, 2016
@lukebakken
Copy link
Contributor

Hello ... I think I get what's going on, but it would be more helpful to have a complete example if possible. Can you provide one?

@lukebakken lukebakken modified the milestones: riak-python-client-2.7.0, riak-python-client-2.7.1 Dec 12, 2016
@lukebakken lukebakken modified the milestones: riak-python-client-2.7.1, riak-python-client-3.0.0 Feb 22, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants