Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
slilichenko committed Jan 16, 2025
2 parents 78a5a13 + 46699a0 commit 5378604
Show file tree
Hide file tree
Showing 219 changed files with 55,714 additions and 12,870 deletions.
17 changes: 9 additions & 8 deletions .github/workflows/beam_PerformanceTests_xlang_KafkaIO_Python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ on:
# Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: write
checks: write
pull-requests: read
checks: read
contents: read
deployments: read
id-token: none
issues: write
issues: read
discussions: read
packages: read
pages: read
Expand All @@ -49,17 +49,18 @@ env:
INFLUXDB_USER_PASSWORD: ${{ secrets.INFLUXDB_USER_PASSWORD }}

jobs:
beam_PerformanceTests_xlang_KafkaIO_Python:
# Using 'PerfTests' instead of 'PerformanceTests' to comply with Kafka Strimzi's name length limitations.
beam_PerfTests_xlang_KafkaIO_Python:
if: |
github.event_name == 'workflow_dispatch' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
github.event.comment.body == 'Run Python xlang KafkaIO Performance Test'
runs-on: [self-hosted, ubuntu-20.04, main]
runs-on: [self-hosted, ubuntu-20.04, highmem]
timeout-minutes: 240
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["beam_PerformanceTests_xlang_KafkaIO_Python"]
job_name: ["beam_PerfTests_xlang_KafkaIO_Python"]
job_phrase: ["Run Python xlang KafkaIO Performance Test"]
steps:
- uses: actions/checkout@v4
Expand All @@ -76,7 +77,7 @@ jobs:
- name: Set k8s access
uses: ./.github/actions/setup-k8s-access
with:
cluster_name: beam-utility
cluster_name: kafka-workflows
k8s_namespace: ${{ matrix.job_name }}-${{ github.run_id }}
cluster_zone: us-central1
- name: Install Kafka
Expand Down Expand Up @@ -119,4 +120,4 @@ jobs:
-Prunner=DataflowRunner \
-PloadTest.mainClass=apache_beam.io.external.xlang_kafkaio_perf_test \
-PpythonVersion=3.9 \
'-PloadTest.args=${{ env.beam_PerformanceTests_xlang_KafkaIO_Python_test_arguments_1 }}'
'-PloadTest.args=${{ env.beam_PerfTests_xlang_KafkaIO_Python_test_arguments_1 }}'
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
--metrics_table=python_kafkaio_results
--influx_measurement=python_kafkaio_results
--test_class=KafkaIOPerfTest
--input_options=''{\\"num_records\\":100000000,\\"key_size\\":10,\\"value_size\\":90,\\"algorithm\\":\\"lcg\\"}''
--input_options=''{\\"num_records\\":50000000,\\"key_size\\":10,\\"value_size\\":90,\\"algorithm\\":\\"lcg\\"}''
--kafka_topic=beam
--read_timeout=1500
--read_timeout=3000
--num_workers=5
--autoscaling_algorithm=NONE
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@

* AWS V1 I/Os have been removed (Java). As part of this, x-lang Python Kinesis I/O has been updated to consume the V2 IO and it also no longer supports setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)).
* Upgraded to protobuf 4 (Java) ([#33192](https://github.com/apache/beam/issues/33192)), but forced Debezium IO to use protobuf 3 ([#33541](https://github.com/apache/beam/issues/33541) because Debezium clients are not protobuf 4 compatible. This may cause conflicts when using clients which are only compatible with protobuf 4.
* Minimum Go version for Beam Go updated to 1.22.10 ([#33609](https://github.com/apache/beam/pull/33609))

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.gradle

/**
* Utilities for working with our vendored version of gRPC.
*
* To update:
* 1. Determine the set of io.grpc libraries we want to include, most likely a superset of
* of the previous vendored gRPC version.
* 2. Use mvn dependency:tree and https://search.maven.org/search?q=g:io.grpc
* to determine dependency tree. You may need to search for optional dependencies
* and determine if they are needed (e.g. conscrypt).
* 3. Validate built artifacts by running linkage tool
* (https://github.com/apache/beam/tree/master/vendor#how-to-validate-the-vendored-dependencies)
* and unit and integration tests in a PR (e.g. https://github.com/apache/beam/pull/16460,
* https://github.com/apache/beam/pull/16459)
*/
class GrpcVendoring_1_69_0 {
static def grpc_version = "1.69.0"

// See https://github.com/grpc/grpc-java/blob/v1.60.1/gradle/libs.versions.toml
// or https://search.maven.org/search?q=io.grpc%201.60.1
static def guava_version = "33.3.1-jre"
static def protobuf_version = "3.25.5"
static def gson_version = "2.11.0"
static def google_auth_version = "1.24.1"
static def opencensus_version = "0.31.1"
static def conscrypt_version = "2.5.2"
static def proto_google_common_protos_version = "2.48.0"

/** Returns the list of implementation time dependencies. */
static List<String> dependencies() {
return [
"com.google.guava:guava:$guava_version",
"com.google.protobuf:protobuf-java:$protobuf_version",
"com.google.protobuf:protobuf-java-util:$protobuf_version",
"com.google.code.gson:gson:$gson_version",
"io.grpc:grpc-alts:$grpc_version",
"io.grpc:grpc-auth:$grpc_version",
"io.grpc:grpc-context:$grpc_version",
"io.grpc:grpc-core:$grpc_version",
"io.grpc:grpc-netty-shaded:$grpc_version",
"io.grpc:grpc-protobuf:$grpc_version",
"io.grpc:grpc-services:$grpc_version",
"io.grpc:grpc-stub:$grpc_version",
"io.grpc:grpc-testing:$grpc_version",
"io.grpc:grpc-util:$grpc_version",
"com.google.auth:google-auth-library-credentials:$google_auth_version",
"com.google.api.grpc:proto-google-common-protos:$proto_google_common_protos_version",
"io.opencensus:opencensus-api:$opencensus_version",
"io.opencensus:opencensus-contrib-grpc-metrics:$opencensus_version",
]
}

/**
* Returns the list of dependencies that should be exported as runtime
* dependencies within the vendored jar.
*/
static List<String> runtimeDependencies() {
return [
'com.google.auto.value:auto-value-annotations:1.8.2',
'com.google.errorprone:error_prone_annotations:2.20.0',
// transient dependencies of grpc-alts->google-auth-library-oauth2-http->google-http-client:
'org.apache.httpcomponents:httpclient:4.5.13',
'org.apache.httpcomponents:httpcore:4.4.15',
// TODO(BEAM-9288): Enable relocation for conscrypt
"org.conscrypt:conscrypt-openjdk-uber:$conscrypt_version"
]
}

/**
* Returns the list of test dependencies.
*/
static List<String> testDependencies() {
return [
'junit:junit:4.12',
]
}

static Map<String, String> relocations() {
// The relocation paths below specifically use gRPC and the full version string as
// the code relocation prefix. See https://lists.apache.org/thread.html/4c12db35b40a6d56e170cd6fc8bb0ac4c43a99aa3cb7dbae54176815@%3Cdev.beam.apache.org%3E
// for further details.

// To produce the list of necessary relocations, one needs to start with a set of target
// packages that one wants to vendor, find all necessary transitive dependencies of that
// set and provide relocations for each such that all necessary packages and their
// dependencies are relocated. Any optional dependency that doesn't need relocation
// must be excluded via an 'exclude' rule. There is additional complexity of libraries that use
// JNI or reflection and have to be handled on case by case basis by learning whether
// they support relocation and how would one go about doing it by reading any documentation
// those libraries may provide. The 'validateShadedJarDoesntLeakNonOrgApacheBeamClasses'
// ensures that there are no classes outside of the 'org.apache.beam' namespace.

String version = "v" + grpc_version.replace(".", "p")
String prefix = "org.apache.beam.vendor.grpc.${version}"
List<String> packagesToRelocate = [
// guava uses the com.google.common and com.google.thirdparty package namespaces
"com.google.common",
"com.google.thirdparty",
"com.google.protobuf",
"com.google.gson",
"com.google.auth",
"com.google.api",
"com.google.cloud",
"com.google.logging",
"com.google.longrunning",
"com.google.rpc",
"com.google.type",
"com.google.geo.type",
"io.grpc",
"io.opencensus",
"io.perfmark",
]

return packagesToRelocate.collectEntries {
[ (it): "${prefix}.${it}" ]
} + [
// Redirect io.grpc.netty.shaded to top.
// To keep namespace consistency before switching from io.grpc:grpc-netty.
"io.grpc.netty.shaded": "${prefix}",
] + [
// Adapted from https://github.com/grpc/grpc-java/blob/e283f70ad91f99c7fee8b31b605ef12a4f9b1690/netty/shaded/build.gradle#L41
// We have to be careful with these replacements as they must not match any
// string in NativeLibraryLoader, else they cause corruption. Note that
// this includes concatenation of string literals and constants.
'META-INF/native/io_grpc_netty_shaded_netty': "META-INF/native/org_apache_beam_vendor_grpc_${version}_netty",
'META-INF/native/libio_grpc_netty_shaded_netty': "META-INF/native/liborg_apache_beam_vendor_grpc_${version}_netty",
]
}

static Map<String, List<String>> relocationExclusions() {
// sub-package excluded from relocation
return [
"io.grpc": ["io.grpc.netty.shaded.**"],
]
}

/** Returns the list of shading exclusions. */
static List<String> exclusions() {
return [
// Don't include in the vendored jar:
// android annotations, autovalue annotations, errorprone, checkerframework, JDK8 annotations, objenesis, junit,
// apache commons, log4j, slf4j and mockito
"android/annotation/**/",
"com/google/apps/**",
"com/google/auto/value/**",
"com/google/errorprone/**",
"com/google/instrumentation/**",
"com/google/j2objc/annotations/**",
"com/google/shopping/**",
"io/grpc/netty/shaded/io/netty/handler/codec/marshalling/**",
"io/grpc/netty/shaded/io/netty/handler/codec/spdy/**",
"io/grpc/netty/shaded/io/netty/handler/codec/compression/JZlib*",
"io/grpc/netty/shaded/io/netty/handler/codec/compression/Lz4*",
"io/grpc/netty/shaded/io/netty/handler/codec/compression/Lzf*",
"io/grpc/netty/shaded/io/netty/handler/codec/compression/Lzma*",
"io/grpc/netty/shaded/io/netty/handler/codec/protobuf/Protobuf*Nano.class",
"io/grpc/netty/shaded/io/netty/util/internal/logging/CommonsLogger*",
"io/grpc/netty/shaded/io/netty/util/internal/logging/LocationAwareSlf4JLogger*",
"io/grpc/netty/shaded/io/netty/util/internal/logging/Log4JLogger*",
"io/grpc/netty/shaded/io/netty/util/internal/logging/Log4J2Logger*",
"javax/annotation/**",
"junit/**",
"module-info.class",
"org/apache/commons/logging/**",
"org/apache/commons/codec/**",
"org/apache/http/**",
"org/checkerframework/**",
"org/codehaus/mojo/animal_sniffer/**",
"org/conscrypt/**",
"META-INF/native/libconscrypt**",
"META-INF/native/conscrypt**",
"org/hamcrest/**",
"org/junit/**",
"org/mockito/**",
"org/objenesis/**",
// proto source files
"google/**/*.proto",
"grpc/**/*.proto",
]
}

/**
* Returns a closure containing the dependencies map used for shading gRPC within the main
* Apache Beam project.
*/
static Object dependenciesClosure() {
return {
dependencies().each { implementation it }
runtimeDependencies().each { shadow it }
}
}

/**
* Returns a closure with the code relocation configuration for shading gRPC within the main
* Apache Beam project.
*/
static Object shadowClosure() {
def relocExclusions = relocationExclusions()
return {
relocations().each { srcNamespace, destNamespace ->
relocate(srcNamespace, destNamespace) {
if (relocExclusions.containsKey(srcNamespace)) {
relocExclusions.get(srcNamespace).each { toExclude ->
exclude toExclude
}
}
}
}
exclusions().each { exclude it }
}
}
}
34 changes: 29 additions & 5 deletions examples/notebooks/beam-ml/bigtable_enrichment_transform.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -605,15 +605,39 @@
},
{
"cell_type": "markdown",
"metadata": {
"id": "F-xjiP_pHWZr"
},
"source": [
"metadata": {},
"source": [
"### What is a Cross-Join?\n",
"A cross-join is a Cartesian product operation where each row from one table is combined with every row from another table. It is useful when we want to create all possible combinations of two datasets.\n",
"\n",
"**Example:**\n",
"Table A:\n",
" | A1 | A2 |\n",
" |----|----|\n",
" | 1 | X |\n",
" | 2 | Y |\n",
"\n",
"Table B:\n",
" | B1 | B2 |\n",
" |----|----|\n",
" | 10 | P |\n",
" | 20 | Q |\n",
"\n",
"**Result of Cross-Join:**\n",
" | A1 | A2 | B1 | B2 |\n",
" |----|----|----|----|\n",
" | 1 | X | 10 | P |\n",
" | 1 | X | 20 | Q |\n",
" | 2 | Y | 10 | P |\n",
" | 2 | Y | 20 | Q |\n",
"\n",
"Cross-joins can be computationally expensive for large datasets, so use them judiciously.\n",
"\n",
"By default, the enrichment transform performs a [`cross_join`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment.html#apache_beam.transforms.enrichment.cross_join). This join returns the enriched row with the following fields: `sale_id`, `customer_id`, `product_id`, `quantity`, `price`, and `customer_location`.\n",
"\n",
"To make a prediction when running the ecommerce example, however, the trained model needs the following fields: `product_id`, `quantity`, `price`, `customer_id`, and `customer_location`.\n",
"\n",
"Therefore, to get the required fields for the ecommerce example, design a custom join function that takes two dictionaries as input and returns an enriched row that include these fields."
"Therefore, to get the required fields for the ecommerce example, design a custom join function that takes two dictionaries as input and returns an enriched row that include these fields.\n"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

type: edu
files:
- name: src/org/apache/beam/learning/katas/commontransforms/aggregation/count/Task.java
visible: true
placeholders:
- offset: 1896
length: 36
placeholder_text: TODO()
- name: test/org/apache/beam/learning/katas/commontransforms/aggregation/count/TaskTest.java
visible: false
- name: src/org/apache/beam/learning/katas/commontransforms/aggregation/count/Task.java
visible: true
placeholders:
- offset: 1962
length: 36
placeholder_text: TODO()
- name: test/org/apache/beam/learning/katas/commontransforms/aggregation/count/TaskTest.java
visible: false
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

type: edu
files:
- name: src/org/apache/beam/learning/katas/commontransforms/aggregation/max/Task.java
visible: true
placeholders:
- offset: 1899
length: 42
placeholder_text: TODO()
- name: test/org/apache/beam/learning/katas/commontransforms/aggregation/max/TaskTest.java
visible: false
- name: src/org/apache/beam/learning/katas/commontransforms/aggregation/max/Task.java
visible: true
placeholders:
- offset: 1965
length: 42
placeholder_text: TODO()
- name: test/org/apache/beam/learning/katas/commontransforms/aggregation/max/TaskTest.java
visible: false
Loading

0 comments on commit 5378604

Please sign in to comment.