Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Fix for missing physical plan in UI #3786

Merged
merged 85 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
8536fde
Fix for missing physical plan in UI
nicknezis Mar 2, 2022
9cd8e23
Fix the Heron UI timeline metrics
nicknezis Mar 2, 2022
6c84aa8
Reverting the tracker API back to previous calls
nicknezis Mar 3, 2022
dd23309
More cleanup to make the UI metricstimeline work
nicknezis Mar 3, 2022
7e3eeb4
Cleaning up the REST API structure
nicknezis Mar 4, 2022
feb4fe6
Various heron-tracker fixes
nicknezis Mar 6, 2022
32d339e
More fixes
nicknezis Mar 7, 2022
c7dcdb2
Rolling back a change
nicknezis Mar 7, 2022
c95e926
Typo fix
nicknezis Mar 7, 2022
2672575
typo
thinker0 Mar 8, 2022
ab144e3
Add check eliments
thinker0 Mar 8, 2022
a67e4f2
Fix typos (#3788)
thinker0 Mar 8, 2022
42acd57
Merge remote-tracking branch 'thinker/tracker-fixes-1' into nicknezis…
nicknezis Mar 9, 2022
6b93384
Fix name mismatch
nicknezis Mar 9, 2022
3542351
Removed EnvelopeAPI Router which was not working
nicknezis Mar 9, 2022
0560c8e
More fixes
nicknezis Mar 14, 2022
5dc7d2e
Updated AckingTopology to not burn so much CPU
nicknezis Mar 14, 2022
59ce8b4
More potential fixes
nicknezis Mar 15, 2022
5388489
Update heron/tools/common/src/python/clients/tracker.py
nicknezis Mar 26, 2022
d92ad26
Update heron/tools/common/src/python/clients/tracker.py
nicknezis Mar 26, 2022
72769a3
Update heron/tools/tracker/src/python/app.py
nicknezis Mar 26, 2022
a1d46ab
Update heron/tools/tracker/src/python/tracker.py
nicknezis Mar 26, 2022
eddf0a3
Update heron/tools/tracker/src/python/app.py
nicknezis Mar 26, 2022
322818f
Merge commit '8841d1c4d6b3156a40ffed856fb32c1ceb682855' into nicknezi…
nicknezis Mar 26, 2022
234240c
Fixing the Tracker client URIs
nicknezis Mar 27, 2022
771892a
Whitespace style check
nicknezis Mar 27, 2022
64e35fe
[Tracker|Shell] cleaning up imports.
surahman Mar 27, 2022
cf77242
[Tracker] whitespace fix.
surahman Mar 27, 2022
9aa4091
[Tracker] setting global Log
surahman Mar 27, 2022
4b17564
[UI] setting global Log
surahman Mar 27, 2022
f417981
[UI] white space fixes.
surahman Mar 27, 2022
69528d0
[UI] switching to `None` check for failure.
surahman Mar 28, 2022
fc8efab
[UI] fixing issues with dict causing CI errors.
surahman Mar 28, 2022
4acae44
Added tracker server performance timing info in header
nicknezis Apr 2, 2022
56416b2
Fix for metrics timeline
nicknezis Apr 2, 2022
cbac2df
Updated the unit tests
nicknezis Apr 2, 2022
1723c15
Updated pylint to fix Python 3.9 issues
nicknezis Apr 2, 2022
df97725
Whole lot of formatting changes
nicknezis Apr 3, 2022
1533dea
Fix compile error
nicknezis Apr 3, 2022
028f87f
[Examples] word spout syntax fix.
surahman Apr 3, 2022
4cdfa6d
[Style] utils.topology.topology_context_impl
surahman Apr 3, 2022
30d7b6d
[Style] utils.misc.serializer_helper
surahman Apr 3, 2022
aa052d3
[Style] utils.misc.pplan_helper
surahman Apr 3, 2022
fcc7408
[Style] utils.misc.communicator
surahman Apr 3, 2022
3ef6cce
[Style] utils.metrics.py_metrics
surahman Apr 3, 2022
9846985
[Style] utils.metrics.metrics_helper
surahman Apr 3, 2022
2b825c6
[Style] network.socket_options
surahman Apr 3, 2022
0c8d2fc
[Style] network.protocol
surahman Apr 3, 2022
7a8927e
[Style] network.metricsmgr_client
surahman Apr 3, 2022
4ba503c
[Style] network.heron_client
surahman Apr 3, 2022
ebd9f75
[Style] network.gateway_looper
surahman Apr 3, 2022
fc3d531
More style fixes
nicknezis Apr 3, 2022
0d3e833
[Style] instance
surahman Apr 3, 2022
8456e58
[Style] basics.spout_instance
surahman Apr 3, 2022
3967c80
[Style] basics.bolt_instance
surahman Apr 3, 2022
47682fd
[Style] basics.base_instance
surahman Apr 3, 2022
e7e673e
[Style] utils.metrics.py_metrics
surahman Apr 3, 2022
7417292
[Style] fixes to super() calls.
surahman Apr 3, 2022
483915e
Merge branch 'master' into nicknezis/tracker-fixes
surahman Apr 3, 2022
b9fd15f
More style cleanup
nicknezis Apr 3, 2022
6b87e26
More lint related style fixes
nicknezis Apr 3, 2022
731fb95
More fixes
nicknezis Apr 3, 2022
d9b1a92
More fixes
nicknezis Apr 3, 2022
34e3759
Typo fix
nicknezis Apr 3, 2022
1a3e1b7
More fixes
nicknezis Apr 3, 2022
ffc1af5
More fixes
nicknezis Apr 3, 2022
df11d8c
One more fix
nicknezis Apr 3, 2022
8af12c0
More fixes
nicknezis Apr 4, 2022
708734d
Fixing local integration test
nicknezis Apr 4, 2022
c8d35a6
Convert memory value to int to fix test issue
nicknezis Apr 7, 2022
32a67dd
More style fixes and type conversion to correct string interpolation
nicknezis Apr 7, 2022
c395071
Another fix to enforce `int` type
nicknezis Apr 7, 2022
1a4fe04
Typo fix
nicknezis Apr 7, 2022
1cf27d9
Remove import that IDE auto added
nicknezis Apr 7, 2022
e30e6ba
Update heron/statemgrs/src/python/statemanager.py
nicknezis Apr 8, 2022
52c9401
Update heronpy/connectors/pulsar/pulsarstreamlet.py
nicknezis Apr 8, 2022
1525256
Fixes based on feedback
nicknezis Apr 8, 2022
ba1aaec
Putting back to original logic
nicknezis Apr 8, 2022
403f32c
[UI] removing global reference to Log
surahman Apr 8, 2022
4492c48
timeline values seem to be of float type
nicknezis Apr 9, 2022
54af1df
More fixes
nicknezis Apr 10, 2022
aa896a9
Whitespace fix
nicknezis Apr 10, 2022
0abd309
Updating dictionary creation to bypass intermediate list creation
nicknezis Apr 10, 2022
dda9349
Returning the logic to original form with direct return of dict
nicknezis Apr 11, 2022
5df20a8
typo (#3814)
thinker0 Apr 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions bazel_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ def discover_git_branch():
# Utility functions for system defines
######################################################################
def define_string(name, value):
return '#define %s "%s"\n' % (name, value)
return f'#define {name} "{value}"\n'

def define_value(name, value):
return '#define %s %s\n' % (name, value)
return f'#define {name} {value}\n'

######################################################################
# Discover where a program is located using the PATH variable
Expand Down Expand Up @@ -144,7 +144,7 @@ def real_program_path(program_name):
return None

def fail(message):
print("\nFAILED: %s" % message)
print(f"\nFAILED: {message}")
sys.exit(1)

# Assumes the version is at the end of the first line consisting of digits and dots
Expand All @@ -158,7 +158,7 @@ def discover_version(path):
version_flag = "-V"
else:
version_flag = "--version"
command = "%s %s" % (path, version_flag)
command = f"{path} {version_flag}"
version_output = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
first_line = version_output.decode('ascii', 'ignore').split("\n")[0]
version = get_trailing_version(first_line)
Expand Down Expand Up @@ -215,12 +215,12 @@ def discover_version(path):
return version


fail ("Could not determine the version of %s from the following output\n%s\n%s" % (path, command, version_output))
fail (f"Could not determine the version of {path} from the following output\n{command}\n{version_output}")

def to_semver(version):
# is version too short
if re.search('^[\d]+\.[\d]+$', version):
return "%s.0" % version
return f"{version}.0"

# is version too long
version_search = re.search('^([\d]+\.[\d]+\.[\d]+)\.[\d]+$', version)
Expand All @@ -231,8 +231,8 @@ def to_semver(version):

def assert_min_version(path, min_version):
version = discover_version(path)
if not semver.match(to_semver(version), ">=%s" % to_semver(min_version)):
fail("%s is version %s which is less than the required version %s" % (path, version, min_version))
if not semver.match(to_semver(version), f">={to_semver(min_version)}"):
fail(f"{path} is version {version} which is less than the required version {min_version}")
return version

######################################################################
Expand Down Expand Up @@ -267,17 +267,17 @@ def make_executable(path):
def discover_tool(program, msg, envvar, min_version = ''):
VALUE = discover_program(program, envvar)
if not VALUE:
fail("""You need to have %s installed to build Heron.
Note: Some vendors install %s with a versioned name
(like /usr/bin/%s-4.8). You can set the %s environment
variable to specify the full path to yours.'""" % (program, program, program, envvar))
fail(f"""You need to have {program} installed to build Heron.
Note: Some vendors install {program} with a versioned name
(like /usr/bin/{program}-4.8). You can set the {envvar} environment
variable to specify the full path to yours.'""")

print_value = VALUE
if min_version:
version = assert_min_version(VALUE, min_version)
print_value = "%s (%s)" % (VALUE, version)
print_value = f"{VALUE} ({version})"

print('Using %s:\t%s' % (msg.ljust(20), print_value))
print(f'Using {msg.ljust(20)}:\t{print_value}')
return VALUE

def discover_jdk():
Expand All @@ -290,7 +290,7 @@ def discover_jdk():
"You can set the JAVA_HOME environment variavle to specify the full path to yours.")
jdk_bin_path = os.path.dirname(javac_path)
jdk_path = os.path.dirname(jdk_bin_path)
print('Using %s:\t%s' % ('JDK'.ljust(20), jdk_path))
print(f"Using {'JDK'.ljust(20)}:\t{jdk_path}")
return jdk_path

def test_venv():
Expand All @@ -312,14 +312,14 @@ def discover_tool_default(program, msg, envvar, defvalue):
VALUE = discover_program(program, envvar)
if not VALUE:
VALUE = defvalue
print('%s:\tnot found, but ok' % (program.ljust(26)))
print(f'{program.ljust(26)}:\tnot found, but ok')
else:
print('Using %s:\t%s' % (msg.ljust(20), VALUE))
print(f'Using {msg.ljust(20)}:\t{VALUE}')
return VALUE

def export_env_to_file(out_file, env):
if env in os.environ:
out_file.write('export %s="%s"\n' % (env, os.environ[env]))
out_file.write(f'export {env}="{os.environ[env]}"\n')

######################################################################
# Generate the shell script that recreates the environment
Expand Down Expand Up @@ -348,7 +348,7 @@ def write_env_exec_file(platform, environ):
out_file.write('$*')

make_executable(env_exec_file)
print('Wrote the environment exec file %s' % (env_exec_file))
print(f'Wrote the environment exec file {env_exec_file}')


######################################################################
Expand Down Expand Up @@ -385,13 +385,13 @@ def write_heron_config_header(config_file):
out_file.write(define_string('GIT_BRANCH', discover_git_branch()))
out_file.write(generate_system_defines())
out_file.close()
print('Wrote the heron config header file: \t"%s"' % (config_file))
print(f'Wrote the heron config header file: \t"{config_file}"')

######################################################################
# MAIN program that sets up your workspace for bazel
######################################################################
def main():
env_map = dict()
env_map = {}

# Discover the platform
platform = discover_platform()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package org.apache.heron.examples.api;

import java.time.Duration;
import java.util.Map;
import java.util.Random;

Expand All @@ -36,6 +37,7 @@
import org.apache.heron.api.tuple.Fields;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.common.basics.SysUtils;

/**
* This is a basic example of a Heron topology with acking enable.
Expand All @@ -53,7 +55,7 @@ public static void main(String[] args) throws Exception {

int spouts = 2;
int bolts = 2;
builder.setSpout("word", new AckingTestWordSpout(), spouts);
builder.setSpout("word", new AckingTestWordSpout(Duration.ofMillis(200)), spouts);
builder.setBolt("exclaim1", new ExclamationBolt(), bolts)
.shuffleGrouping("word");

Expand Down Expand Up @@ -97,8 +99,10 @@ public static class AckingTestWordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] words;
private Random rand;
private final Duration throttleDuration;

public AckingTestWordSpout() {
public AckingTestWordSpout(Duration throttleDuration) {
this.throttleDuration = throttleDuration;
}

@SuppressWarnings("rawtypes")
Expand All @@ -116,7 +120,9 @@ public void close() {

public void nextTuple() {
final String word = words[rand.nextInt(words.length)];

if (!throttleDuration.isZero()) {
SysUtils.sleep(throttleDuration); // sleep to throttle back CPU usage
}
// To enable acking, we need to emit each tuple with a MessageId, which is an Object.
// Each new message emitted needs to be annotated with a unique ID, which allows
// the spout to keep track of which messages should be acked back to the producer or
Expand Down
2 changes: 1 addition & 1 deletion examples/src/python/bolt/consume_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ def process(self, tup):

def process_tick(self, tup):
self.log("Got tick tuple!")
self.log("Total received data tuple: %d" % self.total)
self.log(f"Total received data tuple: {self.total}")
8 changes: 4 additions & 4 deletions examples/src/python/bolt/count_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

"""module for example bolt: CountBolt"""
from collections import Counter
import heronpy.api.global_metrics as global_metrics
from heronpy.api import global_metrics
from heronpy.api.bolt.bolt import Bolt

# pylint: disable=unused-argument
Expand All @@ -34,8 +34,8 @@ def initialize(self, config, context):
self.counter = Counter()
self.total = 0

self.logger.info("Component-specific config: \n%s" % str(config))
self.logger.info("Context: \n%s" % str(context))
self.logger.info(f"Component-specific config: \n{str(config)}")
self.logger.info(f"Context: \n{str(context)}")

def _increment(self, word, inc_by):
self.counter[word] += inc_by
Expand All @@ -49,4 +49,4 @@ def process(self, tup):

def process_tick(self, tup):
self.log("Got tick tuple!")
self.log("Current map: %s" % str(self.counter))
self.log(f"Current map: {str(self.counter)}")
6 changes: 3 additions & 3 deletions examples/src/python/bolt/half_ack_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ def initialize(self, config, context):
def process(self, tup):
self.total += 1
if self.total % 2 == 0:
self.logger.debug("Failing a tuple: %s" % str(tup))
self.logger.debug("Failing a tuple: %s", str(tup))
self.fail(tup)
else:
self.logger.debug("Acking a tuple: %s" % str(tup))
self.logger.debug("Acking a tuple: %s", str(tup))
self.ack(tup)

def process_tick(self, tup):
self.log("Got tick tuple!")
self.log("Total received: %d" % self.total)
self.log(f"Total received: {self.total}")
8 changes: 4 additions & 4 deletions examples/src/python/bolt/stateful_count_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"""module for example bolt: CountBolt"""
from collections import Counter

import heronpy.api.global_metrics as global_metrics
from heronpy.api import global_metrics
from heronpy.api.bolt.bolt import Bolt
from heronpy.api.state.stateful_component import StatefulComponent

Expand All @@ -34,19 +34,19 @@ class StatefulCountBolt(Bolt, StatefulComponent):
# pylint: disable=attribute-defined-outside-init
def init_state(self, stateful_state):
self.recovered_state = stateful_state
self.logger.info("Checkpoint Snapshot recovered : %s" % str(self.recovered_state))
self.logger.info(f"Checkpoint Snapshot recovered : {str(self.recovered_state)}")

def pre_save(self, checkpoint_id):
for (k, v) in list(self.counter.items()):
self.recovered_state.put(k, v)
self.logger.info("Checkpoint Snapshot %s : %s" % (checkpoint_id, str(self.recovered_state)))
self.logger.info(f"Checkpoint Snapshot {checkpoint_id} : {str(self.recovered_state)}")

def initialize(self, config, context):
self.logger.info("In prepare() of CountBolt")
self.counter = Counter()
self.total = 0

self.logger.info("Component-specific config: \n%s" % str(config))
self.logger.info(f"Component-specific config: \n{str(config)}")

def _increment(self, word, inc_by):
self.counter[word] += inc_by
Expand Down
2 changes: 1 addition & 1 deletion examples/src/python/bolt/stream_aggregate_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ def process(self, tup):

def process_tick(self, tup):
self.log("Got tick tuple!")
self.log("Current stream counter: %s" % str(self.stream_counter))
self.log(f"Current stream counter: {str(self.stream_counter)}")
4 changes: 2 additions & 2 deletions examples/src/python/bolt/window_size_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ class WindowSizeBolt(SlidingWindowBolt):
A bolt that calculates the average batch size of window"""

def initialize(self, config, context):
super(WindowSizeBolt, self).initialize(config, context)
super().initialize(config, context)
self.numerator = 0.0
self.denominator = 0.0

def processWindow(self, window_info, tuples):
self.numerator += len(tuples)
self.denominator += 1
self.logger.info("The current average is %f" % (self.numerator / self.denominator))
self.logger.info(f"The current average is {(self.numerator / self.denominator)}")
6 changes: 3 additions & 3 deletions examples/src/python/spout/multi_stream_spout.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ def initialize(self, config, context):

self.emit_count = 0

self.logger.info("Component-specific config: \n%s" % str(config))
self.logger.info("Context: \n%s" % str(context))
self.logger.info(f"Component-specific config: \n{str(config)}")
self.logger.info(f"Context: \n{str(context)}")

def next_tuple(self):
word = next(self.words)
self.emit([word])
self.emit_count += 1

if self.emit_count % 100000 == 0:
self.logger.info("Emitted %s" % str(self.emit_count))
self.logger.info(f"Emitted {str(self.emit_count)}")
self.logger.info("Emitting to error stream")
self.emit(["test error message"], stream='error')
6 changes: 3 additions & 3 deletions examples/src/python/spout/stateful_word_spout.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ class StatefulWordSpout(Spout, StatefulComponent):
# pylint: disable=attribute-defined-outside-init
def init_state(self, stateful_state):
self.recovered_state = stateful_state
self.logger.info("Checkpoint Snapshot recovered : %s" % str(self.recovered_state))
self.logger.info(f"Checkpoint Snapshot recovered : {str(self.recovered_state)}")

def pre_save(self, checkpoint_id):
# Purely for debugging purposes
for (k, v) in list(self.counter.items()):
self.recovered_state.put(k, v)
self.logger.info("Checkpoint Snapshot %s : %s" % (checkpoint_id, str(self.recovered_state)))
self.logger.info(f"Checkpoint Snapshot {checkpoint_id} : {str(self.recovered_state)}")

# pylint: disable=unused-argument
def initialize(self, config, context):
Expand All @@ -52,7 +52,7 @@ def initialize(self, config, context):
self.ack_count = 0
self.fail_count = 0

self.logger.info("Component-specific config: \n%s" % str(config))
self.logger.info(f"Component-specific config: \n{str(config)}")

def next_tuple(self):
word = next(self.words)
Expand Down
8 changes: 4 additions & 4 deletions examples/src/python/spout/word_spout.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def initialize(self, config, context):
self.ack_count = 0
self.fail_count = 0

self.logger.info("Component-specific config: \n%s" % str(config))
self.logger.info("Context: \n%s" % str(context))
self.logger.info(f"Component-specific config: \n{str(config)}")
self.logger.info(f"Context: \n{str(context)}")

def next_tuple(self):
word = next(self.words)
Expand All @@ -50,9 +50,9 @@ def next_tuple(self):
def ack(self, tup_id):
self.ack_count += 1
if self.ack_count % 100000 == 0:
self.logger.info("Acked %sth tuples, tup_id: %s" % (str(self.ack_count), str(tup_id)))
self.logger.info(f"Acked {str(self.ack_count)}th tuples, tup_id: {str(tup_id)}")

def fail(self, tup_id):
self.fail_count += 1
if self.fail_count % 100000 == 0:
self.logger.info("Failed %sth tuples, tup_id: %s" % (str(self.fail_count), str(tup_id)))
self.logger.info(f"Failed {str(self.fail_count)}th tuples, tup_id: {str(tup_id)}")
Loading