Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BinaryIO reads break when file size > 64 MB #5472

Closed
clairemcginty opened this issue Sep 5, 2024 · 1 comment
Closed

BinaryIO reads break when file size > 64 MB #5472

clairemcginty opened this issue Sep 5, 2024 · 1 comment

Comments

@clairemcginty
Copy link
Contributor

Dataflow will try to break the file into offset splits of desiredByteSizeBytes, which we've set to 64MB, although binary files should not be split.

Sample repro:

object TestBinaryWrite {
  val coder = AvroCoder.of(classOf[Account]) // used to map avro -> bytes

  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    args("method") match {
     // produces a singlefile about 280MB
      case "write" =>
          val records = sc.parallelize(1 to 10_000)
            .flatMap(i => (1 to 250).map(_ * i))
            .map { i =>
              Account
                .newBuilder()
                .setId(i)
                .setAmount(i.toDouble)
                .setName(UUID.randomUUID().toString)
                .setType("checking")
                .build()
            }
             .map(CoderUtils.encodeToByteArray(coder, _))
            .saveAsBinaryFile(args("output"), numShards = 1)
      // read the file we just wrote
      case "read" =>
          sc.binaryFile(args("input"), reader = MyBinaryReader)
  }

    sc.run()

  // A completely meaningless implementation, doesn't matter to demo this bug
  case object MyBinaryReader extends BinaryFileReader {
    override type State = Int
    override def start(is: InputStream): Int = 1
    override def readRecord(state: Int, is: InputStream): (Int, Array[Byte]) = {
      val buf = new Array[Byte](1000)
      is.read(buf)
      (state, buf)
    }
  }
}

which, on read, throws this error in DF:

Error message from worker: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: CompressedSources must start reading at offset 0. Requested offset: 201326592
	org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:187)
	org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
	org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:56)
	org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:39)
	org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
	org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
	org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:117)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
	org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
	org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:304)
	org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:276)
	org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:206)
	org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:150)
	org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:130)
	org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:117)
	java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: CompressedSources must start reading at offset 0. Requested offset: 201326592
	org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
	org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:189)
	org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
	org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:276)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:86)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:432)
	org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:89)
	org.apache.beam.sdk.transforms.MapElements$2.processElement(MapElements.java:151)
	org.apache.beam.sdk.transforms.MapElements$2$DoFnInvoker.invokeProcessElement(Unknown Source)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:189)
	org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
	org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:276)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:86)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:432)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:412)
	org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86)
	org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown Source)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
	org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:189)
	org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
	org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:185)
	... 22 more
Caused by: java.lang.IllegalArgumentException: CompressedSources must start reading at offset 0. Requested offset: 201326592
	org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:204)
	org.apache.beam.sdk.io.CompressedSource.<init>(CompressedSource.java:232)
	org.apache.beam.sdk.io.CompressedSource.createForSubrangeOfFile(CompressedSource.java:259)
	org.apache.beam.sdk.io.ReadAllViaFileBasedSourceTransform$AbstractReadFileRangesFn.process(ReadAllViaFileBasedSourceTransform.java:131)

My guess is that something is misconfigured with the splittability of the BinaryIO reader implementation? Or there's some method to direct ReadAllViaFileBasedSource not to try to split the source into offset ranges. We should compare against a sample Beam non-splittable source (I think TFRecordIO is such an example?)

@clairemcginty
Copy link
Contributor Author

Upon following my own advice, I tried copying TFRecordIO's technique of setting desiredBundleSizeBytes of Long.MAX_VALUE to avoid splitting, and it solved the problem 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant