-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provide example of mixed Java + Python topology using storm-kafka #31
Comments
Is this just about adding an example, or is support for Java + Python lacking at the moment? The discussion on #1 seems to indicate that you decided to user plain jars instead of uberjars, which make submitting a Java + Python topology impossible (if I understand everything correctly). |
It's a bit of both. JVM-based topologies will have dependencies that need to be bundled in the JAR when topologies are submitted to clusters. The problem as #1 points out, we took an easy route by going with
I think this is actually overkill, really someone just needs to take a little bit of time to figure out the proper leinigen config to exclude some of the Storm includes from the uberjar. No need for the separate profiles. As for a Kafka example, this is something I've started on a few times, just haven't found time to finish. Going to commit my changes for it right now if you'd like to poke at it @dan-blanchard. |
Example added. I haven't gotten this running yet @dan-blanchard, but feel free to look around if you'd like. |
Thanks for adding that @msukmanowsky. I'm sure it'll come in handy. I'm trying to write a create a topology with Java, Perl, and Python bolts, so I guess I'll have to figure out the uberjar thing sooner rather than later. |
Note: there is a whole lot more documentation online about Storm's built-in Kafka spout -- documentation and code was heavily updated for the forthcoming 0.9.3 release: https://github.com/apache/incubator-storm/tree/master/external/storm-kafka |
Full example has been added here https://github.com/Parsely/streamparse/tree/feature/uberjar/examples/kafka-jvm. Installs a vagrant-run server and uses that for seeding a Kafka topic then reading from it with a JVM-based KafkaSpout. Would love to get help testing this @amontalenti and @dan-blanchard. I've already asked @kbourgoin. |
Thanks for putting this together @msukmanowsky! I'll take a look at this soon. |
@msukmanowsky I finally had a chance to play with this today. Unfortunately, I've run into a few Python 3 compatibility issues with it. In
Even after making all those changes (and using the dev version of kafka-python that adds support for Python 3), I'm still running into this error: Seeding Kafka (streamparse-box:9092) topic 'pixels' with 100,000 fake pixels.
Traceback (most recent call last):
File "/Users/dblanchard/anaconda/envs/streamparse/bin/invoke", line 6, in <module>
sys.exit(main())
File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/invoke/cli.py", line 295, in main
dispatch(sys.argv)
File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/invoke/cli.py", line 288, in dispatch
return executor.execute(*tasks, dedupe=dedupe)
File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/invoke/executor.py", line 89, in execute
task=task, name=name, args=args, kwargs=kwargs
File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/invoke/executor.py", line 128, in _execute
return task(*args, **kwargs)
File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/invoke/tasks.py", line 108, in __call__
result = self.body(*args, **kwargs)
File "/Users/dblanchard/Documents/streamparse/examples/kafka-jvm/tasks.py", line 32, in wrapper
raise e
File "/Users/dblanchard/Documents/streamparse/examples/kafka-jvm/tasks.py", line 29, in wrapper
return func(*args, **kwargs)
File "/Users/dblanchard/Documents/streamparse/examples/kafka-jvm/tasks.py", line 78, in seed_kafka
producer.send_messages(topic_name.encode('utf-8'), pixel)
File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/kafka/producer/simple.py", line 69, in send_messages
partition = self._next_partition(topic)
File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/kafka/producer/simple.py", line 56, in _next_partition
self.client.load_metadata_for_topics(topic)
File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/kafka/client.py", line 323, in load_metadata_for_topics
kafka.common.check_error(topic_metadata)
File "/Users/dblanchard/anaconda/envs/streamparse/lib/python3.4/site-packages/kafka/common.py", line 215, in check_error
raise error(response)
kafka.common.LeaderNotAvailableError: TopicMetadata(topic=b'pixels', error=5, partitions=[]) That error looks a lot like the one referenced here, but even after adding their suggested workaround, I'm still seeing the same problem. |
As for the README, I'd prefer to see the bit about modifying the SSH config just replaced by a suggestion to run: vagrant ssh-config | sed -e 's/Host default/Host streamparse-box/' -e '/HostName 127.0.0.1/d' -e 's/Port 2222/Port 22/' -e 's/LogLevel FATAL/LogLevel INFO/' >> ~/.ssh/config It is also somewhat problematic that people need to modify vagrant ssh-config | sed -e 's/Host default/Host streamparse-box/' -e 's/HostName 127.0.0.1/HostName 192.168.50.50/' -e 's/Port 2222/Port 22/' -e 's/LogLevel FATAL/LogLevel INFO/' >> ~/.ssh/config so that there is an SSH alias to 192.168.50.50. I may be wrong, but it seems like the only reason you need the |
I thought I'd just move on and see if
This may be a weird network issue on my side. I can access the JAR directly from Maven central, just not via Leiningen for some reason. |
I manually downloaded the snappy-java jar and put it in Weirdly enough, after submitting the topology and having it fail because the topic wasn't setup, I could then run |
There are still some remaining Python 3 compatibility issues here, even with the changes I recommended. I just realized that there are lot of missing messages when you run
This is the entirety of the output for
I'm wondering if maybe that fork of fabric that supposedly adds Python 3 support is not actually working as expected. |
Thanks for looking this over @dan-blanchard! I made the Py3 changes to
As for the other Python 3 issues you encountered, I'm not 100% sure what to do there. To clarify, everything works in Python 3, but the fabric output seems to be hidden? |
BTW, renamed the branch to https://github.com/Parsely/streamparse/tree/feature/jvm-example |
I haven't been able to get it work at all with Python 3 on the host machine. I believe the fabric fork we're using doesn't actually work. It just doesn't raise exceptions. That said, I've taken your example and added onto a bit and I'm very close to having a setup where I use Python 2 to submit a topology that uses a Python 3 conda environment in the VM. |
Awesome! Keep us posted. |
@dan-blanchard @msukmanowsky
This will create the topic My setup is a little different (using Docker rather than Vagrant) but this has helped me. |
Thanks @closedLoop, I'll give this a try. Could be a bug in how Kafka auto-creates topics so this would make sense. |
@msukmanowsky its in the process of being fixed in |
There is another pull request on-going over at our Pyleus friends for integrating Java components via the YAML DSL. See Yelp/pyleus#99 |
The example we have works fine now, so I'm closing this. |
Should I reopen this one? The kafka-jvm example does not run .. as dan removed the pixelcount.clj from the topology directory, When I run Someone please help? |
Hello, I'm running into this problem when playing around with kafka example:
Any help highly appreciated :) |
Requested on the mailing list:
The best first example would be to use the new official Kafka spout (storm-kafka) that is now included in Storm's 0.9.2 release. This combined with Python bolts processing the data from the Kafka spout would be the best first Python + Java interop example for streamparse, I think.
The text was updated successfully, but these errors were encountered: