Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide example of mixed Java + Python topology using storm-kafka #31

Closed
amontalenti opened this issue Jul 10, 2014 · 23 comments
Closed
Assignees
Milestone

Comments

@amontalenti
Copy link
Contributor

Requested on the mailing list:

I am looking forward to understand how I can use clojure DSL to submit a a topology which has spout written in Java and Bolts written in Python.

The best first example would be to use the new official Kafka spout (storm-kafka) that is now included in Storm's 0.9.2 release. This combined with Python bolts processing the data from the Kafka spout would be the best first Python + Java interop example for streamparse, I think.

@amontalenti amontalenti added this to the v0.1.0 milestone Jul 10, 2014
@amontalenti amontalenti self-assigned this Jul 10, 2014
@dan-blanchard
Copy link
Member

Is this just about adding an example, or is support for Java + Python lacking at the moment? The discussion on #1 seems to indicate that you decided to user plain jars instead of uberjars, which make submitting a Java + Python topology impossible (if I understand everything correctly).

@msukmanowsky
Copy link
Contributor

It's a bit of both. JVM-based topologies will have dependencies that need to be bundled in the JAR when topologies are submitted to clusters. The problem as #1 points out, we took an easy route by going with lein jar instead of lein uberjar which excludes all JVM-dependencies. @amontalenti said:

It works around the issue but isn't ideal -- the ideal way to handle this would be to make a lein profile for each language's runtime requirements (e.g. a profile named "python", one named "java", one named "clojure", whatever -- but in all cases, excluding the dependency on Storm itself) and then merge those profiles when making the uberjar to send to the cluster.

I think this is actually overkill, really someone just needs to take a little bit of time to figure out the proper leinigen config to exclude some of the Storm includes from the uberjar. No need for the separate profiles.

As for a Kafka example, this is something I've started on a few times, just haven't found time to finish. Going to commit my changes for it right now if you'd like to poke at it @dan-blanchard.

@msukmanowsky
Copy link
Contributor

Example added. I haven't gotten this running yet @dan-blanchard, but feel free to look around if you'd like.

@dan-blanchard
Copy link
Member

Thanks for adding that @msukmanowsky. I'm sure it'll come in handy.

I'm trying to write a create a topology with Java, Perl, and Python bolts, so I guess I'll have to figure out the uberjar thing sooner rather than later.

@amontalenti
Copy link
Contributor Author

Note: there is a whole lot more documentation online about Storm's built-in Kafka spout -- documentation and code was heavily updated for the forthcoming 0.9.3 release:

https://github.com/apache/incubator-storm/tree/master/external/storm-kafka

@msukmanowsky
Copy link
Contributor

Full example has been added here https://github.com/Parsely/streamparse/tree/feature/uberjar/examples/kafka-jvm. Installs a vagrant-run server and uses that for seeding a Kafka topic then reading from it with a JVM-based KafkaSpout. Would love to get help testing this @amontalenti and @dan-blanchard. I've already asked @kbourgoin.

@dan-blanchard
Copy link
Member

Thanks for putting this together @msukmanowsky! I'll take a look at this soon.

@dan-blanchard
Copy link
Member

@msukmanowsky I finally had a chance to play with this today. Unfortunately, I've run into a few Python 3 compatibility issues with it.

In tasks.py:

  • Add from __future__ import print_function to make 2.7 happy
  • print statements should be changed to use print function
  • pixels.next() ➡️ next(pixels)
  • xrange ➡️ range (add from six.moves import range for 2.7)
  • kafka-python requires messages be sent as bytes with Python 3, so add some .encode('utf-8') bits to the arguments for producer.send_messages

Even after making all those changes (and using the dev version of kafka-python that adds support for Python 3), I'm still running into this error:

Seeding Kafka (streamparse-box:9092) topic 'pixels' with 100,000 fake pixels.
Traceback (most recent call last):
  File "/Users/dblanchard/anaconda/envs/streamparse/bin/invoke", line 6, in <module>
    sys.exit(main())
  File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/invoke/cli.py", line 295, in main
    dispatch(sys.argv)
  File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/invoke/cli.py", line 288, in dispatch
    return executor.execute(*tasks, dedupe=dedupe)
  File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/invoke/executor.py", line 89, in execute
    task=task, name=name, args=args, kwargs=kwargs
  File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/invoke/executor.py", line 128, in _execute
    return task(*args, **kwargs)
  File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/invoke/tasks.py", line 108, in __call__
    result = self.body(*args, **kwargs)
  File "/Users/dblanchard/Documents/streamparse/examples/kafka-jvm/tasks.py", line 32, in wrapper
    raise e
  File "/Users/dblanchard/Documents/streamparse/examples/kafka-jvm/tasks.py", line 29, in wrapper
    return func(*args, **kwargs)
  File "/Users/dblanchard/Documents/streamparse/examples/kafka-jvm/tasks.py", line 78, in seed_kafka
    producer.send_messages(topic_name.encode('utf-8'), pixel)
  File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/kafka/producer/simple.py", line 69, in send_messages
    partition = self._next_partition(topic)
  File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/kafka/producer/simple.py", line 56, in _next_partition
    self.client.load_metadata_for_topics(topic)
  File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/kafka/client.py", line 323, in load_metadata_for_topics
    kafka.common.check_error(topic_metadata)
  File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/kafka/common.py", line 215, in check_error
    raise error(response)
kafka.common.LeaderNotAvailableError: TopicMetadata(topic=b'pixels', error=5, partitions=[])

That error looks a lot like the one referenced here, but even after adding their suggested workaround, I'm still seeing the same problem.

@dan-blanchard
Copy link
Member

As for the README, I'd prefer to see the bit about modifying the SSH config just replaced by a suggestion to run:

vagrant ssh-config | sed -e 's/Host default/Host streamparse-box/' -e '/HostName 127.0.0.1/d' -e 's/Port 2222/Port 22/' -e 's/LogLevel FATAL/LogLevel INFO/' >> ~/.ssh/config

It is also somewhat problematic that people need to modify /etc/hosts for this to all work properly. If it was just a matter of sparse needing to connect via SSH, we should be able to get away with modifying the command I specified above to be:

vagrant ssh-config | sed -e 's/Host default/Host streamparse-box/' -e 's/HostName 127.0.0.1/HostName 192.168.50.50/' -e 's/Port 2222/Port 22/' -e 's/LogLevel FATAL/LogLevel INFO/' >> ~/.ssh/config

so that there is an SSH alias to 192.168.50.50.

I may be wrong, but it seems like the only reason you need the /etc/hosts bit is to make the kafka seeding work, so maybe there's a way to make that use the IP instead of the hostname?

@dan-blanchard
Copy link
Member

I thought I'd just move on and see if sparse submit would work (even if the topics weren't seeded properly), but that fails because of:

(Retrieving org/xerial/snappy/snappy-java/1.0.5/snappy-java-1.0.5.jar from central)
(Could not transfer artifact org.xerial.snappy:snappy-java:jar:1.0.5 from/to central (http://repo1.maven.org/maven2/): GET request of: org/xerial/snappy/snappy-java/1.0.5/snappy-java-1.0.5.jar from central failed)
(Could not find artifact org.xerial.snappy:snappy-java:jar:1.0.5 in clojars (https://clojars.org/repo/))
This could be due to a typo in :dependencies or network issues.
If you are behind a proxy, try setting the 'http_proxy' environment variable.
Uberjar aborting because jar failed: Could not resolve dependencies

This may be a weird network issue on my side. I can access the JAR directly from Maven central, just not via Leiningen for some reason.

@dan-blanchard
Copy link
Member

I manually downloaded the snappy-java jar and put it in $HOME/.m2/repository/org/xerial/snappy/snappy-java/1.0.5 and now everything seems to work.

Weirdly enough, after submitting the topology and having it fail because the topic wasn't setup, I could then run invoke seed_kafka and it actually worked.

@dan-blanchard
Copy link
Member

There are still some remaining Python 3 compatibility issues here, even with the changes I recommended. I just realized that there are lot of missing messages when you run sparse submit with Python 3 vs 2. If I use Python 2 everything works fine, but with 3 I've noticed I'm missing:

  • All of the "compiling" messages.
  • Everything prefixed with [streamparse-box]

This is the entirety of the output for sparse submit -n pixelcount with Python 3:

Cleaning from prior builds...
Creating topology uberjar...
Uberjar created: /Users/dblanchard/Documents/streamparse/examples/kafka-jvm/_build/streamparse-kafka-sample-0.0.1-SNAPSHOT-standalone.jar
Deploying "pixelcount" topology...
ssh tunnel to Nimbus streamparse-box:6627 established.
Routing Python logging to /var/log/streamparse.
Running lein command to submit topology to nimbus:
lein run -m streamparse.commands.submit_topology/-main topologies/pixelcount.clj --option 'topology.workers=2' --option 'topology.acker.executors=2' --option 'topology.python.path="/data/virtualenvs/pixelcount/bin/python"' --option 'streamparse.log.path="/var/log/streamparse"' --option 'streamparse.log.max_bytes=1000000' --option 'streamparse.log.backup_count=10' --option 'streamparse.log.level="info"'
Compiling pixelcount.spouts.pixel_spout
Compiling pixelcount
{:option {streamparse.log.level info, streamparse.log.backup_count 10, streamparse.log.max_bytes 1000000, streamparse.log.path /var/log/streamparse, topology.python.path /data/virtualenvs/pixelcount/bin/python, topology.acker.executors 2, topology.workers 2}, :debug false, :port 6627, :host localhost, :help false}
658  [main] INFO  backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar...
696  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar /Users/dblanchard/Documents/streamparse/examples/kafka-jvm/_build/streamparse-kafka-sample-0.0.1-SNAPSHOT-standalone.jar to assigned location: /home/storm/storm-data/nimbus/inbox/stormjar-c7cec8d3-7664-4456-a72a-10ffc4e58c97.jar
2850 [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/storm/storm-data/nimbus/inbox/stormjar-c7cec8d3-7664-4456-a72a-10ffc4e58c97.jar
2851 [main] INFO  backtype.storm.StormSubmitter - Submitting topology pixelcount in distributed mode with conf {"streamparse.log.backup_count":10,"topology.workers":2,"topology.acker.executors":2,"streamparse.log.path":"\/var\/log\/streamparse","topology.python.path":"\/data\/virtualenvs\/pixelcount\/bin\/python","topology.debug":false,"streamparse.log.max_bytes":1000000,"streamparse.log.level":"info","topology.max.spout.pending":5000,"topology.message.timeout.secs":60}
2941 [main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: pixelcount

I'm wondering if maybe that fork of fabric that supposedly adds Python 3 support is not actually working as expected.

@msukmanowsky
Copy link
Contributor

Thanks for looking this over @dan-blanchard! I made the Py3 changes to tasks.pyas well adopting your sed wizardry in the readme. I'm not too concerned with the fact that we have to modify ssh config for this example, it's just an example after all. Also, the ssh config mods are needed for two reasons if I recall (been awhile now):

  1. Ensure that we can properly SSH to the vagrant box using the ssh magic vagrant needs (e.g. identity file). We can't pass ssh params to streamparse/vagrant.
  2. The zookeeper kafka / zookeeper setup required a host name. I can't elaborate too much there as I really can't recall the issue I fixed, but it was something to do with that.

As for the other Python 3 issues you encountered, I'm not 100% sure what to do there. To clarify, everything works in Python 3, but the fabric output seems to be hidden?

@msukmanowsky
Copy link
Contributor

BTW, renamed the branch to https://github.com/Parsely/streamparse/tree/feature/jvm-example

@dan-blanchard
Copy link
Member

As for the other Python 3 issues you encountered, I'm not 100% sure what to do there. To clarify, everything works in Python 3, but the fabric output seems to be hidden?

I haven't been able to get it work at all with Python 3 on the host machine. I believe the fabric fork we're using doesn't actually work. It just doesn't raise exceptions.

That said, I've taken your example and added onto a bit and I'm very close to having a setup where I use Python 2 to submit a topology that uses a Python 3 conda environment in the VM.

@msukmanowsky
Copy link
Contributor

Awesome! Keep us posted.

@closedLoop
Copy link

@dan-blanchard @msukmanowsky
I'm not sure where this comment goes but I believe the topic creation issue disappears if you run(on the machine running kafka):

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper zk --create --topic exampleTopic --partitions 3 --replication-factor 2

This will create the topic exampleTopic (with the specified replication and partitions). Note: zk is the connection string to the zookeeper instance

My setup is a little different (using Docker rather than Vagrant) but this has helped me.

@msukmanowsky
Copy link
Contributor

Thanks @closedLoop, I'll give this a try. Could be a bug in how Kafka auto-creates topics so this would make sense.

@closedLoop
Copy link

@msukmanowsky its in the process of being fixed in kafka-python
dpkp/kafka-python#174

@dan-blanchard dan-blanchard modified the milestones: v1.1, v1.2 Apr 9, 2015
@dan-blanchard dan-blanchard removed this from the v1.1 milestone Apr 9, 2015
@amontalenti
Copy link
Contributor Author

There is another pull request on-going over at our Pyleus friends for integrating Java components via the YAML DSL. See Yelp/pyleus#99

@dan-blanchard
Copy link
Member

The example we have works fine now, so I'm closing this.

@CaledoniaProject
Copy link

Should I reopen this one? The kafka-jvm example does not run .. as dan removed the pixelcount.clj from the topology directory,

When I run sparse run, it says there's no topology defined.

Someone please help?

#219

@gwaramadze
Copy link

gwaramadze commented Aug 16, 2016

Hello, I'm running into this problem when playing around with kafka example:

$ invoke seed_kafka
Traceback (most recent call last):
  File "/home/remy/.virtualenvs/storm/lib/python3.4/site-packages/invoke/tasks.py", line 141, in argspec
    context_arg = arg_names.pop(0)
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/remy/.virtualenvs/storm/bin/invoke", line 11, in <module>
    sys.exit(program.run())
  File "/home/remy/.virtualenvs/storm/lib/python3.4/site-packages/invoke/program.py", line 269, in run
    self._parse(argv)
  File "/home/remy/.virtualenvs/storm/lib/python3.4/site-packages/invoke/program.py", line 325, in _parse
    self.load_collection()
  File "/home/remy/.virtualenvs/storm/lib/python3.4/site-packages/invoke/program.py", line 473, in load_collection
    coll = loader.load(coll_name) if coll_name else loader.load()
  File "/home/remy/.virtualenvs/storm/lib/python3.4/site-packages/invoke/loader.py", line 53, in load
    module = imp.load_module(name, fd, path, desc)
  File "/home/remy/.virtualenvs/storm/lib/python3.4/imp.py", line 235, in load_module
    return load_source(name, filename, file)
  File "/home/remy/.virtualenvs/storm/lib/python3.4/imp.py", line 171, in load_source
    module = methods.load()
  File "<frozen importlib._bootstrap>", line 1220, in load
  File "<frozen importlib._bootstrap>", line 1200, in _load_unlocked
  File "<frozen importlib._bootstrap>", line 1129, in _exec
  File "<frozen importlib._bootstrap>", line 1471, in exec_module
  File "<frozen importlib._bootstrap>", line 321, in _call_with_frames_removed
  File "/home/remy/workspace/streamparse/examples/kafka-jvm/tasks.py", line 63, in <module>
    def seed_kafka(kafka_hosts=None, topic_name=None, num_pixels=100000):
  File "/home/remy/.virtualenvs/storm/lib/python3.4/site-packages/invoke/tasks.py", line 267, in task
    return Task(args[0], **kwargs)
  File "/home/remy/.virtualenvs/storm/lib/python3.4/site-packages/invoke/tasks.py", line 58, in __init__
    self.positional = self.fill_implicit_positionals(positional)
  File "/home/remy/.virtualenvs/storm/lib/python3.4/site-packages/invoke/tasks.py", line 149, in fill_implicit_positionals
    args, spec_dict = self.argspec(self.body)
  File "/home/remy/.virtualenvs/storm/lib/python3.4/site-packages/invoke/tasks.py", line 144, in argspec
    raise TypeError("Tasks must have an initial Context argument!")
TypeError: Tasks must have an initial Context argument!

Any help highly appreciated :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants