Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UNOMI-885: fix migration error on rollover alias #716

Merged
merged 4 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/unomi-ci-build-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ jobs:
- name: Integration tests
run: mvn -ntp clean install -Pintegration-tests
- name: Archive code coverage logs
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
if: false # UNOMI-746 Reactivate if necessary
with:
name: unomi-code-coverage-jdk11-${{ github.run_number }}
path: itests/target/site/jacoco
- name: Archive unomi logs
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
if: failure()
with:
name: unomi-log-jdk11-${{ github.run_number }}
Expand Down
4 changes: 2 additions & 2 deletions itests/src/test/java/org/apache/unomi/itests/AllITs.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.unomi.itests;

import org.apache.unomi.itests.migration.Migrate16xTo220IT;
import org.apache.unomi.itests.migration.Migrate16xToCurrentVersionIT;
import org.apache.unomi.itests.graphql.*;
import org.apache.unomi.itests.migration.MigrationIT;
import org.junit.runner.RunWith;
Expand All @@ -31,7 +31,7 @@
*/
@RunWith(Suite.class)
@SuiteClasses({
Migrate16xTo220IT.class,
Migrate16xToCurrentVersionIT.class,
MigrationIT.class,
BasicIT.class,
ConditionEvaluatorIT.class,
Expand Down
1 change: 1 addition & 0 deletions itests/src/test/java/org/apache/unomi/itests/BaseIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ public Option[] config() {
editConfigurationFilePut("etc/custom.system.properties", "org.apache.unomi.elasticsearch.cluster.name", "contextElasticSearchITests"),
editConfigurationFilePut("etc/custom.system.properties", "org.apache.unomi.elasticsearch.addresses", "localhost:9400"),
editConfigurationFilePut("etc/custom.system.properties", "org.apache.unomi.elasticsearch.taskWaitingPollingInterval", "50"),
editConfigurationFilePut("etc/custom.system.properties", "org.apache.unomi.elasticsearch.rollover.maxDocs", "300"),

systemProperty("org.ops4j.pax.exam.rbc.rmi.port").value("1199"),
systemProperty("org.apache.unomi.hazelcast.group.name").value("cellar"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@
import java.io.IOException;
import java.util.*;

public class Migrate16xTo220IT extends BaseIT {
public class Migrate16xToCurrentVersionIT extends BaseIT {

private int eventCount = 0;
private int sessionCount = 0;
private Set<String[]> initialScopes = new HashSet<>();

private static final String SCOPE_NOT_EXIST = "SCOPE_NOT_EXIST";
private static final int NUMBER_DUPLICATE_SESSIONS = 3;
private static final List<String> oldSystemItemsIndices = Arrays.asList("context-actiontype", "context-campaign", "context-campaignevent", "context-goal",
"context-userlist", "context-propertytype", "context-scope", "context-conditiontype", "context-rule", "context-scoring", "context-segment", "context-groovyaction", "context-topic",
"context-patch", "context-jsonschema", "context-importconfig", "context-exportconfig", "context-rulestats");
Expand Down Expand Up @@ -100,12 +99,12 @@ public void checkMigratedData() throws Exception {
checkPagePathForEventView();
checkPastEvents();
checkScopeEventHaveBeenUpdated();
countNumberOfSessionIndices();
}

/**
* Checks if at least the new index for events and sessions exists.
* Also checks:
* - duplicated sessions are correctly removed (-3 sessions in final count)
* - persona sessions are now merged in session index due to index reduction in 2_2_0 (+2 sessions in final count)
*/
private void checkEventSessionRollover2_2_0() throws IOException {
Expand All @@ -122,7 +121,7 @@ private void checkEventSessionRollover2_2_0() throws IOException {
newSessioncount += countItems(httpClient, sessionIndex, null);
}
Assert.assertEquals(eventCount, newEventcount);
Assert.assertEquals(sessionCount - NUMBER_DUPLICATE_SESSIONS, newSessioncount);
Assert.assertEquals(sessionCount, newSessioncount);
}

private void checkIndexReductions2_2_0() throws IOException {
Expand Down Expand Up @@ -339,6 +338,14 @@ private void initCounts(CloseableHttpClient httpClient) {
}
}

private void countNumberOfSessionIndices() {
try {
Set<String> sessionIndices = MigrationUtils.getIndexesPrefixedBy(httpClient, "http://localhost:9400", "context-session");
Assert.assertEquals(2, sessionIndices.size());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void getScopeFromEvents(CloseableHttpClient httpClient, String eventIndex) throws IOException {
String requestBody = resourceAsString("migration/match_all_login_event_request.json");
JsonNode jsonNode = objectMapper.readTree(HttpUtils.executePostRequest(httpClient, "http://localhost:9400" + "/" + eventIndex + "/_search", requestBody, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,23 @@ public static boolean indexExists(CloseableHttpClient httpClient, String esAddre
}
}

public static void configureAlias(CloseableHttpClient httpClient, String esAddress, String alias, String writeIndex, Set<String> readIndices, String configureAliasBody, MigrationContext context) throws IOException {
String readIndicesToAdd = "";
if (!readIndices.isEmpty()) {
readIndicesToAdd = "," + readIndices.stream().map(index -> "{\"add\": {\"index\": \"" + index + "\", \"alias\": \"" + alias + "\", \"is_write_index\": false}}").collect(Collectors.joining(","));
}
if (context != null) {
context.printMessage("Will set " + writeIndex + " as write index for alias " + alias);
context.printMessage("Will set " + readIndices.toString() + " as read indices");
} else {
LOGGER.info("Will set {} as write index for alias {}", writeIndex, alias);
LOGGER.info("Will set {} as read indices", readIndices.toString());
}
String requestBody = configureAliasBody.replace("#writeIndexName", writeIndex).replace("#aliasName", alias).replace("#readIndicesToAdd", readIndicesToAdd);

HttpUtils.executePostRequest(httpClient, esAddress + "/_aliases", requestBody, null);
}

public static Set<String> getIndexesPrefixedBy(CloseableHttpClient httpClient, String esAddress, String prefix) throws IOException {
try (CloseableHttpResponse response = httpClient.execute(new HttpGet(esAddress + "/_aliases"))) {
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
Expand Down Expand Up @@ -300,9 +317,9 @@ public static void waitForYellowStatus(CloseableHttpClient httpClient, String es
* <p>This method sends a request to update documents that match the provided query in the specified index. The update operation is
* performed asynchronously, and the method waits for the task to complete before returning.</p>
*
* @param httpClient the CloseableHttpClient used to send the request to the Elasticsearch server
* @param esAddress the address of the Elasticsearch server
* @param indexName the name of the index where documents should be updated
* @param httpClient the CloseableHttpClient used to send the request to the Elasticsearch server
* @param esAddress the address of the Elasticsearch server
* @param indexName the name of the index where documents should be updated
* @param requestBody the JSON body containing the query and update instructions for the documents
* @throws Exception if there is an error during the HTTP request or while waiting for the task to finish
*/
Expand Down Expand Up @@ -332,23 +349,66 @@ public static void deleteByQuery(CloseableHttpClient httpClient, String esAddres
waitForTaskToFinish(httpClient, esAddress, task.getString("task"), null);
}

private static void printResponseDetail(JSONObject response, MigrationContext migrationContext){
StringBuilder sb = new StringBuilder();
if (response.has("total")) {
sb.append("Total: ").append(response.getInt("total")).append(" ");
}
if (response.has("updated")) {
sb.append("Updated: ").append(response.getInt("updated")).append(" ");
}
if (response.has("created")) {
sb.append("Created: ").append(response.getInt("created")).append(" ");
}
if (response.has("deleted")) {
sb.append("Deleted: ").append(response.getInt("deleted")).append(" ");
}
if (response.has("batches")) {
sb.append("Batches: ").append(response.getInt("batches")).append(" ");
}
if (migrationContext != null) {
migrationContext.printMessage(sb.toString());
} else {
LOGGER.info(sb.toString());
}
}

public static void waitForTaskToFinish(CloseableHttpClient httpClient, String esAddress, String taskId, MigrationContext migrationContext) throws IOException {
while (true) {
final JSONObject status = new JSONObject(
HttpUtils.executeGetRequest(httpClient, esAddress + "/_tasks/" + taskId,
null));
if (status.has("error")) {
final JSONObject error = status.getJSONObject("error");
throw new IOException("Task error: " + error.getString("type") + " - " + error.getString("reason"));
}
if (status.has("completed") && status.getBoolean("completed")) {
if (migrationContext != null) {
migrationContext.printMessage("Task is completed");
} else {
LOGGER.info("Task is completed");
}
if (status.has("response")) {
final JSONObject response = status.getJSONObject("response");
printResponseDetail(response, migrationContext);
if (response.has("failures")) {
final JSONArray failures = response.getJSONArray("failures");
if (!failures.isEmpty()) {
for (int i = 0; i < failures.length(); i++) {
JSONObject failure = failures.getJSONObject(i);
JSONObject cause = failure.getJSONObject("cause");
if (migrationContext != null) {
migrationContext.printMessage("Cause of failure: " + cause.toString());
} else {
LOGGER.error("Cause of failure: {}", cause.toString());
}
}
throw new IOException("Task completed with failures, check previous log for details");
}
}
}
break;
}
if (status.has("error")) {
final JSONObject error = status.getJSONObject("error");
throw new IOException("Task error: " + error.getString("type") + " - " + error.getString("reason"));
}
if (migrationContext != null) {
migrationContext.printMessage("Waiting for Task " + taskId + " to complete");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ context.performMigrationStep("2.2.0-create-event-index", () -> {
if (!MigrationUtils.indexExists(context.getHttpClient(), esAddress, newEventIndex)) {
String baseRequest = MigrationUtils.resourceAsString(bundleContext, "requestBody/2.2.0/base_index_withRollover_request.json")
String mapping = MigrationUtils.extractMappingFromBundles(bundleContext, "event.json")
String configureAliasBody = MigrationUtils.resourceAsString(bundleContext, "requestBody/2.2.0/configure_alias_body.json")

String newIndexSettings = MigrationUtils.buildIndexCreationRequestWithRollover(baseRequest, mapping, context, rolloverPolicyName, rolloverEventAlias)
HttpUtils.executePutRequest(context.getHttpClient(), esAddress + "/" + newEventIndex, newIndexSettings, null)
MigrationUtils.configureAlias(context.getHttpClient(), esAddress, rolloverEventAlias, newEventIndex, Collections.emptySet(), configureAliasBody, context)
}
})

Expand All @@ -73,9 +75,11 @@ context.performMigrationStep("2.2.0-create-session-index", () -> {
if (!MigrationUtils.indexExists(context.getHttpClient(), esAddress, newSessionIndex)) {
String baseRequest = MigrationUtils.resourceAsString(bundleContext, "requestBody/2.2.0/base_index_withRollover_request.json")
String mapping = MigrationUtils.extractMappingFromBundles(bundleContext, "session.json")
String configureAliasBody = MigrationUtils.resourceAsString(bundleContext, "requestBody/2.2.0/configure_alias_body.json")

String newIndexSettings = MigrationUtils.buildIndexCreationRequestWithRollover(baseRequest, mapping, context, rolloverPolicyName, rolloverSessionAlias)
HttpUtils.executePutRequest(context.getHttpClient(), esAddress + "/" + newSessionIndex, newIndexSettings, null)
MigrationUtils.configureAlias(context.getHttpClient(), esAddress, rolloverSessionAlias, newSessionIndex, Collections.emptySet(), configureAliasBody, context)
}
})

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import org.apache.unomi.shell.migration.service.MigrationContext
import org.apache.unomi.shell.migration.utils.HttpUtils
import org.apache.unomi.shell.migration.utils.MigrationUtils
import org.osgi.framework.BundleContext
import org.osgi.framework.Bundle

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
Expand All @@ -25,7 +22,7 @@ MigrationContext context = migrationContext
String esAddress = context.getConfigString("esAddress")
String indexPrefix = context.getConfigString("indexPrefix")
String rolloverPolicyName = indexPrefix + "-unomi-rollover-policy"
String rolloverEventAlias = indexPrefix + "-session"
String rolloverSessionAlias = indexPrefix + "-session"

context.performMigrationStep("2.5.0-clean-profile-mapping", () -> {
String baseSettings = MigrationUtils.resourceAsString(bundleContext, "requestBody/2.0.0/base_index_mapping.json")
Expand All @@ -39,10 +36,17 @@ context.performMigrationStep("2.5.0-clean-session-mapping", () -> {
String baseSettings = MigrationUtils.resourceAsString(bundleContext, "requestBody/2.2.0/base_index_withRollover_request.json")
String cleanPastEventScript = MigrationUtils.getFileWithoutComments(bundleContext, "requestBody/2.5.0/remove_pastEvents_session.painless")
String mapping = MigrationUtils.extractMappingFromBundles(bundleContext, "session.json")
String newIndexSettings = MigrationUtils.buildIndexCreationRequestWithRollover(baseSettings, mapping, context, rolloverPolicyName, rolloverEventAlias)
String newIndexSettings = MigrationUtils.buildIndexCreationRequestWithRollover(baseSettings, mapping, context, rolloverPolicyName, rolloverSessionAlias)
Set<String> sessionIndices = MigrationUtils.getIndexesPrefixedBy(context.getHttpClient(), esAddress, "${indexPrefix}-session-")
String configureAliasBody = MigrationUtils.resourceAsString(bundleContext, "requestBody/2.2.0/configure_alias_body.json")

sessionIndices.each { sessionIndex ->
Set<String> sortedSet = new TreeSet<>(sessionIndices)
sortedSet.each { sessionIndex ->
MigrationUtils.reIndex(context.getHttpClient(), bundleContext, esAddress, sessionIndex, newIndexSettings, cleanPastEventScript, context, "2.5.0-clean-session-mapping")
}
SortedSet<String> allExceptLast = Collections.emptySortedSet();
if (sortedSet.size() > 1){
allExceptLast = sortedSet.headSet(sortedSet.last());
}
MigrationUtils.configureAlias(context.getHttpClient(), esAddress, rolloverSessionAlias, sortedSet.last(), allExceptLast, configureAliasBody, context)
})
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import org.apache.unomi.shell.migration.service.MigrationContext
import org.apache.unomi.shell.migration.utils.HttpUtils
import org.apache.unomi.shell.migration.utils.MigrationUtils

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"source": {
"index": "#source"
"index": "#source",
"size": 5000
},
"dest": {
"index": "#dest"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
},
"aliases": {
"#lifecycleRolloverAlias": {
"is_write_index": true
"is_write_index": false
}
},
"mappings": #mappings
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"source": {
"index": "#source"
"index": "#source",
"size": 5000
},
"dest": {
"index": "#dest"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"actions": [
{
"add": {
"index": "#writeIndexName",
"alias": "#aliasName",
"is_write_index": true
}
}#readIndicesToAdd
]
}
Loading