Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Fix for missing physical plan in UI #3786

Merged
merged 85 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
8536fde
Fix for missing physical plan in UI
nicknezis Mar 2, 2022
9cd8e23
Fix the Heron UI timeline metrics
nicknezis Mar 2, 2022
6c84aa8
Reverting the tracker API back to previous calls
nicknezis Mar 3, 2022
dd23309
More cleanup to make the UI metricstimeline work
nicknezis Mar 3, 2022
7e3eeb4
Cleaning up the REST API structure
nicknezis Mar 4, 2022
feb4fe6
Various heron-tracker fixes
nicknezis Mar 6, 2022
32d339e
More fixes
nicknezis Mar 7, 2022
c7dcdb2
Rolling back a change
nicknezis Mar 7, 2022
c95e926
Typo fix
nicknezis Mar 7, 2022
2672575
typo
thinker0 Mar 8, 2022
ab144e3
Add check eliments
thinker0 Mar 8, 2022
a67e4f2
Fix typos (#3788)
thinker0 Mar 8, 2022
42acd57
Merge remote-tracking branch 'thinker/tracker-fixes-1' into nicknezis…
nicknezis Mar 9, 2022
6b93384
Fix name mismatch
nicknezis Mar 9, 2022
3542351
Removed EnvelopeAPI Router which was not working
nicknezis Mar 9, 2022
0560c8e
More fixes
nicknezis Mar 14, 2022
5dc7d2e
Updated AckingTopology to not burn so much CPU
nicknezis Mar 14, 2022
59ce8b4
More potential fixes
nicknezis Mar 15, 2022
5388489
Update heron/tools/common/src/python/clients/tracker.py
nicknezis Mar 26, 2022
d92ad26
Update heron/tools/common/src/python/clients/tracker.py
nicknezis Mar 26, 2022
72769a3
Update heron/tools/tracker/src/python/app.py
nicknezis Mar 26, 2022
a1d46ab
Update heron/tools/tracker/src/python/tracker.py
nicknezis Mar 26, 2022
eddf0a3
Update heron/tools/tracker/src/python/app.py
nicknezis Mar 26, 2022
322818f
Merge commit '8841d1c4d6b3156a40ffed856fb32c1ceb682855' into nicknezi…
nicknezis Mar 26, 2022
234240c
Fixing the Tracker client URIs
nicknezis Mar 27, 2022
771892a
Whitespace style check
nicknezis Mar 27, 2022
64e35fe
[Tracker|Shell] cleaning up imports.
surahman Mar 27, 2022
cf77242
[Tracker] whitespace fix.
surahman Mar 27, 2022
9aa4091
[Tracker] setting global Log
surahman Mar 27, 2022
4b17564
[UI] setting global Log
surahman Mar 27, 2022
f417981
[UI] white space fixes.
surahman Mar 27, 2022
69528d0
[UI] switching to `None` check for failure.
surahman Mar 28, 2022
fc8efab
[UI] fixing issues with dict causing CI errors.
surahman Mar 28, 2022
4acae44
Added tracker server performance timing info in header
nicknezis Apr 2, 2022
56416b2
Fix for metrics timeline
nicknezis Apr 2, 2022
cbac2df
Updated the unit tests
nicknezis Apr 2, 2022
1723c15
Updated pylint to fix Python 3.9 issues
nicknezis Apr 2, 2022
df97725
Whole lot of formatting changes
nicknezis Apr 3, 2022
1533dea
Fix compile error
nicknezis Apr 3, 2022
028f87f
[Examples] word spout syntax fix.
surahman Apr 3, 2022
4cdfa6d
[Style] utils.topology.topology_context_impl
surahman Apr 3, 2022
30d7b6d
[Style] utils.misc.serializer_helper
surahman Apr 3, 2022
aa052d3
[Style] utils.misc.pplan_helper
surahman Apr 3, 2022
fcc7408
[Style] utils.misc.communicator
surahman Apr 3, 2022
3ef6cce
[Style] utils.metrics.py_metrics
surahman Apr 3, 2022
9846985
[Style] utils.metrics.metrics_helper
surahman Apr 3, 2022
2b825c6
[Style] network.socket_options
surahman Apr 3, 2022
0c8d2fc
[Style] network.protocol
surahman Apr 3, 2022
7a8927e
[Style] network.metricsmgr_client
surahman Apr 3, 2022
4ba503c
[Style] network.heron_client
surahman Apr 3, 2022
ebd9f75
[Style] network.gateway_looper
surahman Apr 3, 2022
fc3d531
More style fixes
nicknezis Apr 3, 2022
0d3e833
[Style] instance
surahman Apr 3, 2022
8456e58
[Style] basics.spout_instance
surahman Apr 3, 2022
3967c80
[Style] basics.bolt_instance
surahman Apr 3, 2022
47682fd
[Style] basics.base_instance
surahman Apr 3, 2022
e7e673e
[Style] utils.metrics.py_metrics
surahman Apr 3, 2022
7417292
[Style] fixes to super() calls.
surahman Apr 3, 2022
483915e
Merge branch 'master' into nicknezis/tracker-fixes
surahman Apr 3, 2022
b9fd15f
More style cleanup
nicknezis Apr 3, 2022
6b87e26
More lint related style fixes
nicknezis Apr 3, 2022
731fb95
More fixes
nicknezis Apr 3, 2022
d9b1a92
More fixes
nicknezis Apr 3, 2022
34e3759
Typo fix
nicknezis Apr 3, 2022
1a3e1b7
More fixes
nicknezis Apr 3, 2022
ffc1af5
More fixes
nicknezis Apr 3, 2022
df11d8c
One more fix
nicknezis Apr 3, 2022
8af12c0
More fixes
nicknezis Apr 4, 2022
708734d
Fixing local integration test
nicknezis Apr 4, 2022
c8d35a6
Convert memory value to int to fix test issue
nicknezis Apr 7, 2022
32a67dd
More style fixes and type conversion to correct string interpolation
nicknezis Apr 7, 2022
c395071
Another fix to enforce `int` type
nicknezis Apr 7, 2022
1a4fe04
Typo fix
nicknezis Apr 7, 2022
1cf27d9
Remove import that IDE auto added
nicknezis Apr 7, 2022
e30e6ba
Update heron/statemgrs/src/python/statemanager.py
nicknezis Apr 8, 2022
52c9401
Update heronpy/connectors/pulsar/pulsarstreamlet.py
nicknezis Apr 8, 2022
1525256
Fixes based on feedback
nicknezis Apr 8, 2022
ba1aaec
Putting back to original logic
nicknezis Apr 8, 2022
403f32c
[UI] removing global reference to Log
surahman Apr 8, 2022
4492c48
timeline values seem to be of float type
nicknezis Apr 9, 2022
54af1df
More fixes
nicknezis Apr 10, 2022
aa896a9
Whitespace fix
nicknezis Apr 10, 2022
0abd309
Updating dictionary creation to bypass intermediate list creation
nicknezis Apr 10, 2022
dda9349
Returning the logic to original form with direct return of dict
nicknezis Apr 11, 2022
5df20a8
typo (#3814)
thinker0 Apr 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package org.apache.heron.examples.api;

import java.time.Duration;
import java.util.Map;
import java.util.Random;

Expand All @@ -36,6 +37,7 @@
import org.apache.heron.api.tuple.Fields;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.common.basics.SysUtils;

/**
* This is a basic example of a Heron topology with acking enable.
Expand All @@ -53,7 +55,7 @@ public static void main(String[] args) throws Exception {

int spouts = 2;
int bolts = 2;
builder.setSpout("word", new AckingTestWordSpout(), spouts);
builder.setSpout("word", new AckingTestWordSpout(Duration.ofMillis(200)), spouts);
builder.setBolt("exclaim1", new ExclamationBolt(), bolts)
.shuffleGrouping("word");

Expand Down Expand Up @@ -97,8 +99,10 @@ public static class AckingTestWordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] words;
private Random rand;
private final Duration throttleDuration;

public AckingTestWordSpout() {
public AckingTestWordSpout(Duration throttleDuration) {
this.throttleDuration = throttleDuration;
}

@SuppressWarnings("rawtypes")
Expand All @@ -116,7 +120,9 @@ public void close() {

public void nextTuple() {
final String word = words[rand.nextInt(words.length)];

if (!throttleDuration.isZero()) {
SysUtils.sleep(throttleDuration); // sleep to throttle back CPU usage
}
// To enable acking, we need to emit each tuple with a MessageId, which is an Object.
// Each new message emitted needs to be annotated with a unique ID, which allows
// the spout to keep track of which messages should be acked back to the producer or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public TrackerMetricsProvider(@Named(CONF_METRICS_SOURCE_URL) String trackerURL,
Client client = ClientBuilder.newClient();

this.baseTarget = client.target(trackerURL)
.path("topologies/metricstimeline")
.path("topologies/metrics/timeline")
.queryParam("cluster", cluster)
.queryParam("environ", environ)
.queryParam("topology", topologyName);
Expand Down
59 changes: 31 additions & 28 deletions heron/shell/src/python/handlers/downloadhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@


''' downloadhandler.py '''
import mimetypes
import os
import logging
from tornado import web, iostream, gen
import tornado.web
import anticrlf

from heron.shell.src.python import utils

class DownloadHandler(web.RequestHandler):
class DownloadHandler(tornado.web.RequestHandler):
"""
Responsible for downloading the files.
"""
Expand All @@ -43,37 +42,41 @@ async def get(self, path):

logger.debug("request to download: %s", path)

# If the file is large, we want to abandon downloading
# if user cancels the requests.
# pylint: disable=attribute-defined-outside-init
self.connection_closed = False

self.set_header("Content-Disposition", "attachment")
if not utils.check_path(path):
self.write("Only relative paths are allowed")
self.set_status(403)
await self.finish("Only relative paths are allowed")
self.finish()
return

if path is None or not os.path.isfile(path):
self.write("File %s not found" % path)
self.set_status(404)
await self.finish("File %s not found" % path)
self.finish()
return

chunk_size = int(4 * 1024 * 1024)
content_type = mimetypes.guess_type(path)
self.set_header("Content-Type", content_type[0])
with open(path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
try:
self.write(chunk) # write the chunk to response
await self.flush() # send the chunk to client
except iostream.StreamCloseError:
# this means the client has closed the connection
# so break the loop
break
finally:
# deleting the chunk is very important because
# if many client are downloading files at the
# same time, the chunks in memory will keep
# increasing and will eat up the RAM
del chunk
# pause the coroutine so other handlers can run
await gen.sleep(0.000000001) # 1 nanosecond
length = int(4 * 1024 * 1024)
offset = int(0)
while True:
data = await utils.read_chunk(path, offset=offset, length=length, escape_data=False)
if self.connection_closed or 'data' not in data or len(data['data']) < length:
break
offset += length
self.write(data['data'])
self.flush()

if 'data' in data:
self.write(data['data'])
self.finish()

def on_connection_close(self):
'''
:return:
'''
# pylint: disable=attribute-defined-outside-init
self.connection_closed = True
13 changes: 8 additions & 5 deletions heron/shell/src/python/handlers/pidhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@


''' pidhandler.py '''
import json
import subprocess
import tornado.web

from heron.shell.src.python import utils

class PidHandler(tornado.web.RequestHandler):
"""
Responsible for getting the process ID for an instance.
Expand All @@ -33,5 +31,10 @@ class PidHandler(tornado.web.RequestHandler):
# pylint: disable=attribute-defined-outside-init
async def get(self, instance_id):
''' get method '''
self.content_type = 'application/json'
await self.finish(json.dumps(utils.chain([['cat', "%s.pid" % instance_id]])).strip())
pid = subprocess.run(['cat', "%s.pid" % instance_id], capture_output=True, text=True,
check=True)
await self.finish({
'command': ' '.join(pid.args),
'stdout': pid.stdout,
'stderr': pid.stderr,
})
2 changes: 1 addition & 1 deletion heron/shell/src/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def run(url_to_handlers=default_handlers):
AsyncHTTPClient.configure(None, defaults=dict(request_timeout=120.0))
app = tornado.web.Application(url_to_handlers)
app.listen(options.port)
tornado.ioloop.IOLoop.instance().start()
tornado.ioloop.IOLoop.current().start()

if __name__ == '__main__':
run()
16 changes: 0 additions & 16 deletions heron/shell/src/python/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
# under the License.

''' utils.py '''
import functools
import grp
import os
import pkgutil
Expand Down Expand Up @@ -177,21 +176,6 @@ def str_cmd(cmd, cwd, env):
stdout, stderr = stdout_builder.result(), stderr_builder.result()
return {'command': ' '.join(cmd), 'stderr': stderr, 'stdout': stdout}

# pylint: disable=unnecessary-lambda
def chain(cmd_list):
"""
Feed output of one command to the next and return final output
Returns string output of chained application of commands.
"""
command = ' | '.join([' '.join(x) for x in cmd_list])
chained_proc = functools.reduce(pipe, [None] + cmd_list)
stdout_builder = proc.async_stdout_builder(chained_proc)
chained_proc.wait()
return {
'command': command,
'stdout': stdout_builder.result()
}

def get_container_id(instance_id):
''' get container id '''
return instance_id.split('_')[1] # Format: container_<index>_component_name_<index>
Expand Down
32 changes: 18 additions & 14 deletions heron/tools/common/src/python/clients/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@

# pylint: disable=bad-whitespace
CLUSTER_URL_FMT = "%s/clusters"

# Nested under /topologies
TOPOLOGIES_URL_FMT = "%s/topologies"
TOPOLOGIES_STATS_URL_FMT = "%s/states" % TOPOLOGIES_URL_FMT
EXECUTION_STATE_URL_FMT = "%s/executionstate" % TOPOLOGIES_URL_FMT
Expand All @@ -53,10 +55,6 @@
PACKINGPLAN_URL_FMT = "%s/packingplan" % TOPOLOGIES_URL_FMT
SCHEDULER_LOCATION_URL_FMT = "%s/schedulerlocation" % TOPOLOGIES_URL_FMT

METRICS_URL_FMT = "%s/metrics" % TOPOLOGIES_URL_FMT
METRICS_QUERY_URL_FMT = "%s/metricsquery" % TOPOLOGIES_URL_FMT
METRICS_TIMELINE_URL_FMT = "%s/metricstimeline" % TOPOLOGIES_URL_FMT

EXCEPTIONS_URL_FMT = "%s/exceptions" % TOPOLOGIES_URL_FMT
EXCEPTION_SUMMARY_URL_FMT = "%s/exceptionsummary" % TOPOLOGIES_URL_FMT

Expand All @@ -66,9 +64,16 @@
JMAP_URL_FMT = "%s/jmap" % TOPOLOGIES_URL_FMT
HISTOGRAM_URL_FMT = "%s/histo" % TOPOLOGIES_URL_FMT

FILE_DATA_URL_FMT = "%s/containerfiledata" % TOPOLOGIES_URL_FMT
FILE_DOWNLOAD_URL_FMT = "%s/containerfiledownload" % TOPOLOGIES_URL_FMT
FILESTATS_URL_FMT = "%s/containerfilestats" % TOPOLOGIES_URL_FMT
# nested under /topologies/metrics/
METRICS_URL_FMT = "%s/metrics" % TOPOLOGIES_URL_FMT
METRICS_QUERY_URL_FMT = "%s/query" % METRICS_URL_FMT
METRICS_TIMELINE_URL_FMT = "%s/timeline" % METRICS_URL_FMT

# nested under /topologies/container/
CONTAINER_URL_FMT = "%s/container" % TOPOLOGIES_URL_FMT
FILE_DATA_URL_FMT = "%s/filedata" % CONTAINER_URL_FMT
FILE_DOWNLOAD_URL_FMT = "%s/filedownload" % CONTAINER_URL_FMT
FILESTATS_URL_FMT = "%s/filestats" % CONTAINER_URL_FMT


def strip_whitespace(s):
Expand Down Expand Up @@ -122,27 +127,26 @@ def strip_whitespace(s):
backpressure=backpressure
)

def api_get(url: str, params=None) -> dict:
def api_get(url: str, params=None) -> Any:
"""Make a GET request to a tracker URL and return the result."""
start = time.time()
try:
Log.debug(f"Requesting URL: {url} with params: {params}")
response = requests.get(url, params)
response.raise_for_status()
except Exception as e:
Log.error(f"Unable to get response from {url}: {e}")
Log.error(f"Unable to get response from {url} with params {params}: {e}")
return None
end = time.time()
data = response.json()
if data["status"] != "success":
Log.error("error from tracker: %s", data["message"])
if response.status_code != requests.codes.ok:
Log.error("error from tracker: %s", response.status_code)
return None

execution = data["executiontime"] * 1000
duration = (end - start) * 1000
Log.debug(f"URL fetch took {execution:.2}ms server time for {url}")
Log.debug(f"URL fetch took {duration:.2}ms round trip time for {url}")

return data["result"]
return data
surahman marked this conversation as resolved.
Show resolved Hide resolved


def create_url(fmt: str) -> str:
Expand Down
2 changes: 1 addition & 1 deletion heron/tools/tracker/src/python/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pex_library(
),
reqs = [
"click==7.1.2",
"fastapi==0.62.0",
"fastapi==0.75.0",
"httpx==0.16.1",
"javaobj-py3==0.4.1",
"networkx==2.5",
Expand Down
31 changes: 6 additions & 25 deletions heron/tools/tracker/src/python/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from typing import Dict, List, Optional

from heron.tools.tracker.src.python import constants, state, query
from heron.tools.tracker.src.python.utils import ResponseEnvelope
from heron.tools.tracker.src.python.routers import topologies, container, metrics

from fastapi import FastAPI, Query
Expand Down Expand Up @@ -80,35 +79,22 @@ async def shutdown_event():
"""Stop recieving topology updates."""
state.tracker.stop_sync()


@app.exception_handler(Exception)
async def handle_exception(_, exc: Exception):
payload = ResponseEnvelope[str](
result=None,
execution_time=0.0,
message=f"request failed: {exc}",
status=constants.RESPONSE_STATUS_FAILURE
)
message = f"request failed: {exc}"
status_code = 500
if isinstance(exc, StarletteHTTPException):
status_code = exc.status_code
if isinstance(exc, RequestValidationError):
status_code = 400
return JSONResponse(content=payload.dict(), status_code=status_code)

return JSONResponse(content=message, status_code=status_code)

@app.get("/clusters", response_model=ResponseEnvelope[List[str]])
@app.get("/clusters")
async def clusters() -> List[str]:
return ResponseEnvelope[List[str]](
execution_time=0.0,
message="ok",
status="success",
result=[s.name for s in state.tracker.state_managers],
)

return (s.name for s in state.tracker.state_managers)
@app.get(
"/machines",
response_model=ResponseEnvelope[Dict[str, Dict[str, Dict[str, List[str]]]]],
response_model=Dict[str, Dict[str, Dict[str, List[str]]]],
)
async def get_machines(
cluster_names: Optional[List[str]] = Query(None, alias="cluster"),
Expand All @@ -134,9 +120,4 @@ async def get_machines(
topology.name
] = topology.get_machines()

return ResponseEnvelope[Dict[str, Dict[str, Dict[str, List[str]]]]](
execution_time=0.0,
result=response,
status="success",
message="ok",
)
return response
4 changes: 3 additions & 1 deletion heron/tools/tracker/src/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import uvicorn

Log = log.Log

Log.setLevel(logging.DEBUG)

def create_tracker_config(config_file: str, stmgr_override: dict) -> dict:
# try to parse the config file if we find one
Expand Down Expand Up @@ -125,6 +125,8 @@ def cli(

log_level = logging.DEBUG if verbose else logging.INFO
log.configure(log_level)
global Log
Log = log.Log
surahman marked this conversation as resolved.
Show resolved Hide resolved

stmgr_override = {
"type": stmgr_type,
Expand Down
Loading