Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: spotify/scio
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.14.12
Choose a base ref
...
head repository: spotify/scio
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v0.14.13
Choose a head ref
  • 7 commits
  • 21 files changed
  • 1 contributor

Commits on Feb 19, 2025

  1. Revert 0.14.12 patch (#5610)

    clairemcginty authored Feb 19, 2025

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    596bdc2 View commit details
  2. Fix bq Avro format issue for nested records (#5611)

    clairemcginty authored Feb 19, 2025

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    34729e0 View commit details

Commits on Feb 24, 2025

  1. Upgrade to Beam 2.63.0 (#5613)

    clairemcginty authored Feb 24, 2025

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    166a22a View commit details

Commits on Feb 25, 2025

  1. Fix REPL assembly (#5614)

    clairemcginty authored Feb 25, 2025

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    03ff143 View commit details
  2. Prepare for 0.14.13 release (#5615)

    clairemcginty authored Feb 25, 2025

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    b5979af View commit details

Commits on Feb 26, 2025

  1. Use Avro 1.8-compatible APIs in BigQueryType macro (#5616)

    clairemcginty authored Feb 26, 2025

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    d306bdb View commit details
  2. Be explicit about using Protobuf 3 in Scio (#5618)

    clairemcginty authored Feb 26, 2025

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    93a20de View commit details
Showing with 583 additions and 254 deletions.
  1. +17 −11 build.sbt
  2. +149 −19 integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala
  3. +25 −6 scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala
  4. +20 −1 scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryUtil.scala
  5. +1 −52 scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala
  6. +20 −0 scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/SCollectionSyntax.scala
  7. +3 −4 scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/TableRowSyntax.scala
  8. +40 −22 scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala
  9. +1 −0 scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/MacroUtil.scala
  10. +14 −2 scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala
  11. +3 −0 scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaUtil.scala
  12. +1 −1 scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/TypeProvider.scala
  13. +4 −2 scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala
  14. +1 −44 scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/BigQueryIOTest.scala
  15. +35 −11 ...-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala
  16. +149 −5 ...-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderTest.scala
  17. +2 −1 scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala
  18. +73 −52 scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaUtilTest.scala
  19. +6 −3 scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/Schemas.scala
  20. +18 −18 scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/TypeProviderTest.scala
  21. +1 −0 site/src/main/paradox/releases/Apache-Beam.md
28 changes: 17 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -29,10 +29,10 @@ import org.typelevel.scalacoptions.JavaMajorVersion.javaMajorVersion
// To test release candidates, find the beam repo and add it as a resolver
// ThisBuild / resolvers += "apache-beam-staging" at "https://repository.apache.org/content/repositories/"
val beamVendorVersion = "0.1"
val beamVersion = "2.62.0"
val beamVersion = "2.63.0"

// check version used by beam
// https://github.com/apache/beam/blob/v2.62.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
// https://github.com/apache/beam/blob/v2.63.0/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
val autoServiceVersion = "1.0.1"
val autoValueVersion = "1.9"
val avroVersion = sys.props.getOrElse("avro.version", "1.11.4")
@@ -52,13 +52,14 @@ val httpCoreVersion = "4.4.14"
val jacksonVersion = "2.15.4"
val jodaTimeVersion = "2.10.14"
val nettyVersion = "4.1.110.Final"
val protobufVersion = "3.25.5"
val slf4jVersion = "1.7.30"
val zstdJniVersion = "1.5.6-3"
// dependent versions
val googleApiServicesBigQueryVersion = s"v2-rev20240919-$googleClientsVersion"
val googleApiServicesDataflowVersion = s"v1b3-rev20240817-$googleClientsVersion"
val googleApiServicesBigQueryVersion = s"v2-rev20241222-$googleClientsVersion"
val googleApiServicesDataflowVersion = s"v1b3-rev20250106-$googleClientsVersion"
val googleApiServicesPubsubVersion = s"v1-rev20220904-$googleClientsVersion"
val googleApiServicesStorageVersion = s"v1-rev20240924-$googleClientsVersion"
val googleApiServicesStorageVersion = s"v1-rev20241206-$googleClientsVersion"
// beam tested versions
val zetasketchVersion = "0.1.0" // sdks/java/extensions/zetasketch/build.gradle
val flinkVersion = "1.19.0" // runners/flink/1.19/build.gradle
@@ -68,9 +69,9 @@ val sparkVersion = "3.5.0" // runners/spark/3/build.gradle
val sparkMajorVersion = VersionNumber(sparkVersion).numbers.take(1).mkString(".")

// check recommended versions from libraries-bom
// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.49.0/index.html
// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/26.53.0/index.html
val failureAccessVersion = "1.0.2"
val checkerQualVersion = "3.47.0"
val checkerQualVersion = "3.48.3"
val jsr305Version = "3.0.2"
val perfmarkVersion = "0.27.0"

@@ -494,9 +495,6 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.spotify.scio.bigquery.types.package#Json.parse"
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.spotify.scio.bigquery.types.package#BigNumeric.bytes"
)
)

@@ -586,7 +584,11 @@ val commonSettings = bomSettings ++ Def.settings(
"joda-time" % "joda-time" % jodaTimeVersion,
"org.apache.httpcomponents" % "httpclient" % httpClientVersion,
"org.apache.httpcomponents" % "httpcore" % httpCoreVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion // slf4j-bom only available for v2
"org.slf4j" % "slf4j-api" % slf4jVersion, // slf4j-bom only available for v2
// remove and let BOM override version after Beam upgrades to 4.x
// see: https://github.com/spotify/scio/issues/5617
"com.google.protobuf" % "protobuf-java" % protobufVersion,
"com.google.protobuf" % "protobuf-java-util" % protobufVersion
),
// libs to help with cross-build
libraryDependencies ++= Seq(
@@ -1053,6 +1055,7 @@ lazy val `scio-google-cloud-platform` = project
moduleFilter("org.apache.arrow", "arrow-vector"),
moduleFilter("com.fasterxml.jackson.datatype", "jackson-datatype-jsr310")
).reduce(_ | _),
Test / javaOptions += "-Doverride.type.provider=com.spotify.scio.bigquery.validation.SampleOverrideTypeProvider",
libraryDependencies ++= Seq(
// compile
"com.esotericsoftware" % "kryo-shaded" % kryoVersion,
@@ -1670,6 +1673,9 @@ lazy val `scio-repl` = project
case PathList("commonMain", _*) =>
// drop conflicting squareup linkdata
MergeStrategy.discard
case PathList("mozilla", "public-suffix-list.txt") =>
// drop conflicting suffix lists from beam-vendor-grpc, httpclient
MergeStrategy.discard
case PathList("META-INF", "io.netty.versions.properties") =>
// merge conflicting netty property files
MergeStrategy.filterDistinctLines
Original file line number Diff line number Diff line change
@@ -21,11 +21,14 @@ import com.google.protobuf.ByteString
import com.spotify.scio.avro._
import com.spotify.scio.coders.Coder
import com.spotify.scio.bigquery.BigQueryTypedTable.Format
import com.spotify.scio.bigquery.BigQueryTypedTable.Format.GenericRecordWithLogicalTypes
import com.spotify.scio.bigquery.client.BigQuery
import com.spotify.scio.bigquery.types.{BigNumeric, Geography, Json}
import com.spotify.scio.testing._
import magnolify.scalacheck.auto._
import org.apache.avro.UnresolvedUnionException
import org.apache.avro.generic.GenericRecord
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{Method => WriteMethod}
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.joda.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import org.joda.time.format.DateTimeFormat
@@ -35,6 +38,8 @@ import org.scalatest.BeforeAndAfterAll
import scala.util.{Random, Try}

object TypedBigQueryIT {
case class Nested(int: Int)

@BigQueryType.toTable
case class Record(
bool: Boolean,
@@ -48,7 +53,32 @@ object TypedBigQueryIT {
timestamp: Instant,
date: LocalDate,
time: LocalTime,
datetime: LocalDateTime,
// BQ DATETIME is problematic with avro as BQ api uses different representations:
// - BQ export uses 'string(datetime)'
// - BQ load uses 'long(local-timestamp-micros)'
// BigQueryType avroSchema favors read with string type
// datetime: LocalDateTime,
geography: Geography,
json: Json,
bigNumeric: BigNumeric,
nestedRequired: Nested,
nestedOptional: Option[Nested]
)

// A record with no nested record types
@BigQueryType.toTable
case class FlatRecord(
bool: Boolean,
int: Int,
long: Long,
float: Float,
double: Double,
numeric: BigDecimal,
string: String,
byteString: ByteString,
timestamp: Instant,
date: LocalDate,
time: LocalTime,
geography: Geography,
json: Json,
bigNumeric: BigNumeric
@@ -77,12 +107,23 @@ object TypedBigQueryIT {
y <- Gen.numChar
} yield Geography(s"POINT($x $y)")
)
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
key <- Gen.alphaStr
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
)
implicit val arbJson: Arbitrary[Json] = Arbitrary {
import Arbitrary._
import Gen._
Gen
.oneOf(
// json object
alphaLowerStr.flatMap(str => arbInt.arbitrary.map(num => s"""{"$str":$num}""")),
// json array
alphaLowerStr.flatMap(str => arbInt.arbitrary.map(num => s"""["$str",$num]""")),
// json literals
alphaLowerStr.map(str => s""""$str""""),
arbInt.arbitrary.map(_.toString),
arbBool.arbitrary.map(_.toString),
Gen.const("null")
)
.map(wkt => Json(wkt))
}

implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary {
// Precision: 76.76 (the 77th digit is partial)
@@ -100,11 +141,14 @@ object TypedBigQueryIT {
s"data-integration-test:bigquery_avro_it.$name${now}_${Random.nextInt(Int.MaxValue)}"
Table.Spec(spec)
}
private val typedTable = table("records")
private val typedTableFileLoads = table("records_fileloads")
private val typedTableStorage = table("records_storage")
private val tableRowTable = table("records_tablerow")
private val avroTable = table("records_avro")
private val tableRowStorage = table("records_tablerow_storage")
private val avroFlatTable = table("records_avro_flat")

private val records = Gen.listOfN(100, recordGen).sample.get
private val records = Gen.listOfN(5, recordGen).sample.get
private val options = PipelineOptionsFactory
.fromArgs(
"--project=data-integration-test",
@@ -116,30 +160,66 @@ object TypedBigQueryIT {
class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
import TypedBigQueryIT._

private val bq = BigQuery.defaultInstance()

override protected def afterAll(): Unit = {
val bq = BigQuery.defaultInstance()
// best effort cleanup
Try(bq.tables.delete(typedTable.ref))
Try(bq.tables.delete(typedTableFileLoads.ref))
Try(bq.tables.delete(typedTableStorage.ref))
Try(bq.tables.delete(tableRowTable.ref))
Try(bq.tables.delete(avroTable.ref))
Try(bq.tables.delete(tableRowStorage.ref))
Try(bq.tables.delete(avroFlatTable.ref))
}

"TypedBigQuery" should "handle records as TableRow" in {
"TypedBigQuery" should "write case classes using FileLoads API" in {
runWithRealContext(options) { sc =>
sc.parallelize(records)
.saveAsTypedBigQueryTable(typedTable, createDisposition = CREATE_IF_NEEDED)
.saveAsTypedBigQueryTable(
typedTableFileLoads,
createDisposition = CREATE_IF_NEEDED,
method = WriteMethod.FILE_LOADS
)
}.waitUntilFinish()

runWithRealContext(options) { sc =>
val data = sc.typedBigQuery[Record](typedTable)
val data = sc.typedBigQuery[Record](typedTableFileLoads)
data should containInAnyOrder(records)
}
}

"BigQueryTypedTable" should "handle records as TableRow format" in {
it should "write case classes using Storage Write API" in {
// Storage write API has a bug impacting TIME field writes: https://github.com/apache/beam/issues/34038
// Todo remove when fixed
the[IllegalArgumentException] thrownBy {
runWithRealContext(options) { sc =>
sc.parallelize(records)
.saveAsTypedBigQueryTable(
typedTableStorage,
createDisposition = CREATE_IF_NEEDED,
method = WriteMethod.STORAGE_WRITE_API
)
}.waitUntilFinish()

runWithRealContext(options) { sc =>
val data = sc.typedBigQuery[Record](typedTableStorage)
data should containInAnyOrder(records)
}
} should have message "TIME schemas are not currently supported for Typed Storage Write API writes. Please use Write method FILE_LOADS instead, or map case classes using BigQueryType.toTableRow and use saveAsBigQueryTable directly."
}

it should "write case classes manually converted to TableRows using FileLoads API" in {
runWithRealContext(options) { sc =>
sc.parallelize(records)
.map(Record.toTableRow)
.map { row =>
// TableRow BQ save API uses json
// TO disambiguate from literal json string,
// field MUST be converted to parsed JSON
val jsonLoadRow = new TableRow()
jsonLoadRow.putAll(row.asInstanceOf[java.util.Map[String, _]]) // cast for 2.12
jsonLoadRow.set("json", Json.parse(row.getJson("json")))
}
.saveAsBigQueryTable(
tableRowTable,
schema = Record.schema,
@@ -153,9 +233,9 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
}
}

// TODO fix if in beam 2.61
ignore should "handle records as avro format" in {
it should "write case classes manually converted to GenericRecords using FileLoads API" in {
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(Record.avroSchema)

runWithRealContext(options) { sc =>
sc.parallelize(records)
.map(Record.toAvro)
@@ -167,8 +247,58 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
}.waitUntilFinish()

runWithRealContext(options) { sc =>
val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro)
data should containInAnyOrder(records)
sc.typedBigQuery[Record](avroTable) should containInAnyOrder(records)
}

// Due to Beam bug with automatic schema detection, can't parse nested record types as GenericRecords yet
// Todo remove assertThrows after fixing in Beam
assertThrows[UnresolvedUnionException] {
runWithRealContext(options) { sc =>
val data =
sc.bigQueryTable(avroTable, format = GenericRecordWithLogicalTypes)
.map(Record.fromAvro)
data should containInAnyOrder(records)
}
}
}

it should "write case classes manually converted to TableRows using Storage Write API" in {
runWithRealContext(options) { sc =>
sc.parallelize(records)
.map(Record.toTableRow)
.saveAsBigQueryTable(
tableRowStorage,
schema = Record.schema,
createDisposition = CREATE_IF_NEEDED,
method = WriteMethod.STORAGE_WRITE_API
)
}.waitUntilFinish()

runWithRealContext(options) { sc =>
sc.typedBigQuery[Record](tableRowStorage) should containInAnyOrder(records)
}
}

it should "read BigQuery rows into GenericRecords and convert them to case classes for records without nested types" in {
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(FlatRecord.avroSchema)

val flatRecords = Gen.listOfN(5, implicitly[Arbitrary[FlatRecord]].arbitrary).sample.get

runWithRealContext(options) { sc =>
sc.parallelize(flatRecords)
.map(FlatRecord.toAvro)
.saveAsBigQueryTable(
avroFlatTable,
schema = FlatRecord.schema,
createDisposition = CREATE_IF_NEEDED
)
}.waitUntilFinish()

runWithRealContext(options) { sc =>
val data =
sc.bigQueryTable(avroFlatTable, Format.GenericRecordWithLogicalTypes)
.map(FlatRecord.fromAvro)
data should containInAnyOrder(flatRecords)
}
}
}
Original file line number Diff line number Diff line change
@@ -741,12 +741,31 @@ object BigQueryTyped {
override type ReadP = Unit
override type WriteP = Table.WriteParam[T]

private val underlying = BigQueryTypedTable[T](
(i: SchemaAndRecord) => BigQueryType[T].fromAvro(i.getRecord),
BigQueryType[T].toTableRow,
BigQueryType[T].fromTableRow,
table
)
private val underlying = {
val readFn = Functions.serializableFn[SchemaAndRecord, T] { x =>
BigQueryType[T].fromAvro(x.getRecord)
}
val writeFn = Functions.serializableFn[AvroWriteRequest[T], GenericRecord] { x =>
BigQueryType[T].toAvro(x.getElement)
}
val schemaFactory = Functions.serializableFn[TableSchema, org.apache.avro.Schema] { _ =>
BigQueryType[T].avroSchema
}
val parseFn = (r: GenericRecord, _: TableSchema) => BigQueryType[T].fromAvro(r)

BigQueryTypedTable[T](
beam.BigQueryIO
.read(readFn)
.useAvroLogicalTypes(),
beam.BigQueryIO
.write[T]()
.withAvroFormatFunction(writeFn)
.withAvroSchemaFactory(schemaFactory)
.useAvroLogicalTypes(),
table,
parseFn
)
}

override def testId: String = s"BigQueryIO(${table.spec})"

Loading