Skip to content

Commit

Permalink
Merge pull request #4 from ProjectEKA/metric-refactor
Browse files Browse the repository at this point in the history
Dubey | 000 | Refactor monitor to not make adhoc heartbeat calls
  • Loading branch information
mddubey authored Oct 5, 2020
2 parents b08675a + e3b9954 commit a90ff7c
Show file tree
Hide file tree
Showing 13 changed files with 286 additions and 197 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ dependencies {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
compile group: 'io.prometheus', name: 'simpleclient', version: '0.9.0'
compile group: 'io.prometheus', name: 'simpleclient_common', version: '0.9.0'
implementation 'io.vertx:vertx-pg-client:3.9.0'
implementation('org.postgresql:postgresql')
implementation 'org.springframework.boot:spring-boot-starter-jdbc:2.3.2.RELEASE'
}

test {
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/in/projecteka/monitor/FetchMetricTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package in.projecteka.monitor;

import in.projecteka.monitor.model.HeartbeatResponse;
import in.projecteka.monitor.model.ServiceProperties;
import in.projecteka.monitor.model.Status;
import lombok.AllArgsConstructor;

import java.time.LocalDateTime;
import java.time.ZoneOffset;

@AllArgsConstructor
public class FetchMetricTask {
private final MetricRepository metricRepository;
private final MetricServiceClient metricServiceClient;

public void fetchMetricAndSave(ServiceProperties property){
String path = String.format("%s%s", property.getUrl(), Constants.PATH_HEARTBEAT);
boolean isAccessible = metricServiceClient.isBridgeAccessible(path);
if (isAccessible){
metricRepository.addMetric(property.getId(), property.getName(), property.getType(), path, Status.UP.name(), LocalDateTime.now(ZoneOffset.UTC), LocalDateTime.now(ZoneOffset.UTC));
}
else {
metricRepository.addMetric(property.getId(), property.getName(), property.getType(), path, Status.DOWN.name(), null, LocalDateTime.now(ZoneOffset.UTC));
}
}
}
82 changes: 0 additions & 82 deletions src/main/java/in/projecteka/monitor/Metric.java

This file was deleted.

4 changes: 2 additions & 2 deletions src/main/java/in/projecteka/monitor/MetricController.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
@RestController
@AllArgsConstructor
public class MetricController {
private final Metric metric;
private final MetricService metricService;

@GetMapping(path = "/metrics")
public void metrics(Writer responseWriter) throws IOException {
metric.processRequests();
metricService.processRequests();
TextFormat.write004(responseWriter, CollectorRegistry.defaultRegistry.metricFamilySamples());
responseWriter.close();
}
Expand Down
115 changes: 61 additions & 54 deletions src/main/java/in/projecteka/monitor/MetricRepository.java
Original file line number Diff line number Diff line change
@@ -1,69 +1,76 @@
package in.projecteka.monitor;

import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import in.projecteka.monitor.model.Status;
import lombok.AllArgsConstructor;
import org.springframework.jdbc.core.JdbcTemplate;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.List;

@AllArgsConstructor
public class MetricRepository {
private static final Logger logger = LoggerFactory.getLogger(MetricRepository.class);
private static final String INSERT_TO_METRICS = "INSERT INTO metrics " +
"(path, status, last_up_time) VALUES ($1, $2, $3)";
private static final String UPDATE_TO_METRICS = "UPDATE metrics SET last_up_time = $1 WHERE path = $2";
private static final String SELECT_LAST_UP_TIME = "SELECT last_up_time FROM metrics WHERE path=$1";
private final PgPool dbClient;
private JdbcTemplate jdbcTemplate;

public MetricRepository(PgPool dbClient) {
this.dbClient = dbClient;
public void addMetric(String bridgeId, String name, String type, String path, String status, LocalDateTime lastUpTime, LocalDateTime lastCheckTime) {
insertOrUpdateMetric(bridgeId, name, type, path, status, lastUpTime, lastCheckTime);
insertMetricHistory(bridgeId, name, type, path, status, lastCheckTime);
}

public Mono<Void> insert(String path, String status, LocalDateTime lastUpTime) {
return Mono.create(monoSink ->
dbClient.preparedQuery(INSERT_TO_METRICS)
.execute(Tuple.of(path, status, lastUpTime),
handler -> {
if (handler.failed()) {
logger.error(handler.cause().getMessage(), handler.cause());
monoSink.error(new DbOperationError());
return;
}
monoSink.success();
}));
private void insertMetricHistory(String bridgeId, String name, String type, String path, String status, LocalDateTime dateCreated) {
String sql = "INSERT INTO metrics_history(bridge_id, name, type, path, status, date_created) values (?, ?, ?, ?, ?, ?)";
jdbcTemplate.update(sql, bridgeId, name, type, path, status, dateCreated);
}

public Mono<Void> update(LocalDateTime lastUpTime, String path) {
return Mono.create(monoSink ->
dbClient.preparedQuery(UPDATE_TO_METRICS)
.execute(Tuple.of(lastUpTime, path),
handler -> {
if (handler.failed()) {
logger.error(handler.cause().getMessage(), handler.cause());
monoSink.error(new DbOperationError());
return;
}
monoSink.success();
}));
private void insertOrUpdateMetric(String bridgeId, String name, String type, String path, String status, LocalDateTime lastUpTime, LocalDateTime lastCheckTime) {
if (!exist(path)) {
String sql = "INSERT INTO metrics(bridge_id, name, type, path, status, last_up_time, last_check_time) values (?, ?, ?, ?, ?, ?, ?)";
jdbcTemplate.update(sql, bridgeId, name, type, path, status, lastUpTime, lastCheckTime);
} else {
String updateMetricsQuery = "UPDATE metrics SET status = '" + status + "',";
if (lastUpTime != null) {
updateMetricsQuery += "last_up_time = '" + lastUpTime + "', ";
}
updateMetricsQuery += "last_check_time = '" + lastCheckTime + "' WHERE path = '" + path + "'";
jdbcTemplate.update(updateMetricsQuery);
}
}

public Mono<String> getIfPresent(String path) {
return Mono.create(monoSink ->
dbClient.preparedQuery(SELECT_LAST_UP_TIME)
.execute(Tuple.of(path),
handler -> {
if (handler.failed()) {
logger.error(handler.cause().getMessage(), handler.cause());
monoSink.error(new DbOperationError());
return;
}
var iterator = handler.result().iterator();
if (!iterator.hasNext()) {
monoSink.success("");
return;
}
monoSink.success(iterator.next().getValue("last_up_time").toString());
}));
public boolean exist(String path) {
Integer count = jdbcTemplate.queryForObject(
"SELECT count(*) FROM metrics WHERE path=?", new Object[]{path},
Integer.class);
return count > 0;
}

public List<Metrics> findAllMetrics() {
String sql = "select bridge_id, name, type, path, status, last_up_time, last_check_time from metrics";
return jdbcTemplate.query(sql, this::mapToMetrics);
}

private Metrics mapToMetrics(ResultSet resultSet, int rowNum) throws SQLException {
String bridgeId = resultSet.getString("bridge_id");
String name = resultSet.getString("name");
String type = resultSet.getString("type");
String path = resultSet.getString("path");
String status = resultSet.getString("status");
LocalDateTime lastUpTime = getLocalDateTime(resultSet, "last_up_time");
LocalDateTime lastCheckTime = getLocalDateTime(resultSet, "last_check_time");
return Metrics.builder()
.bridgeId(bridgeId)
.name(name)
.type(type)
.path(path)
.status(Status.valueOf(status))
.lastUpTime(lastUpTime)
.lastCheckTime(lastCheckTime)
.build();
}

private LocalDateTime getLocalDateTime(ResultSet resultSet, String columnLabel) throws SQLException {
Timestamp timestamp = resultSet.getTimestamp(columnLabel);
return timestamp !=null ? timestamp.toLocalDateTime() : null;
}
}
42 changes: 42 additions & 0 deletions src/main/java/in/projecteka/monitor/MetricScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package in.projecteka.monitor;

import in.projecteka.monitor.model.Service;
import in.projecteka.monitor.model.ServiceProperties;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.Scheduled;

import java.util.List;

@AllArgsConstructor
public class MetricScheduler {
private final TaskExecutor executor;
private final MetricRepository metricRepository;
private final MetricServiceClient metricServiceClient;

private static final Logger logger = LoggerFactory.getLogger(MetricScheduler.class);

@Scheduled(fixedDelayString = "${monitor.scheduler.interval}", initialDelayString = "${monitor.scheduler.initialDelay}")
public void fetchMetrics() {
logger.info("Starting to fetch metrics");
Service service = metricServiceClient.getService();

process(service.getBridgeProperties());
process(service.getConsentManagerProperties());
}

private void process(List<ServiceProperties> properties) {
properties.forEach(property -> executor.execute(
() -> {
try {
new FetchMetricTask(metricRepository, metricServiceClient).fetchMetricAndSave(property);
} catch (Exception e) {
logger.error(String.format("Error while fetching metric for bridge %s with id %s from path %s", property.getName(), property.getId(), property.getUrl()), e);
}
})
);
}

}
49 changes: 49 additions & 0 deletions src/main/java/in/projecteka/monitor/MetricService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package in.projecteka.monitor;

import io.prometheus.client.Gauge;
import lombok.AllArgsConstructor;

import java.util.HashMap;
import java.util.List;

import static in.projecteka.monitor.model.Status.DOWN;

@AllArgsConstructor
public class MetricService {
public static final String PROJECT_EKA_METRICS = "Projecteka_metrics_";
private final MetricRepository metricRepository;
private static final HashMap<String, Gauge> gaugeMap = new HashMap<>();

public void processRequests() {
List<Metrics> allMetrics = metricRepository.findAllMetrics();
for (Metrics metrics : allMetrics) {
if (!gaugeMap.containsKey(PROJECT_EKA_METRICS + metrics.getType())) {
Gauge gaugeStatus = Gauge.build()
.labelNames("Name", "Id", "Path", "Status", "LastUpTime", "LastCheckTime")
.name(PROJECT_EKA_METRICS + metrics.getType())
.help("Heartbeat Status")
.register();
appendToStatus(metrics, gaugeStatus);
}else{
appendToStatus(metrics, gaugeMap.get(PROJECT_EKA_METRICS + metrics.getType()));
}
}
}

private void appendToStatus(Metrics metrics, Gauge gaugeStatus) {
gaugeMap.put(PROJECT_EKA_METRICS + metrics.getType(), gaugeStatus);
Gauge.Child child = gaugeStatus.labels(metrics.getName(),
metrics.getBridgeId(),
metrics.getPath(),
metrics.getStatus().name(),
String.valueOf(metrics.getLastUpTime()),
String.valueOf(metrics.getLastCheckTime()));

//What does the below do exactly?
if (DOWN.equals(metrics.getStatus())) {
child.set(0);
} else {
child.inc();
}
}
}
Loading

0 comments on commit a90ff7c

Please sign in to comment.