Я пытаюсь обработать значительное количество записей, используя облачный поток данных. Мой источник — облачное хранилище Google, а мой приемник — облачный SQL (MySQL). У меня есть следующий код для записи в приемник (Cloud SQL).
p.apply()
....
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.cj.jdbc.Driver", "jdbc:mysql://google/<DBNAME>?cloudSqlInstance=<INSTANCE_NAME>&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=<USERNAME>&password=<PASSWORD>&useSSL=false"
)
)
Вышеприведенное отлично работает, когда я запускаю конвейер, используя DirectRunner
. Но он выдает NullPointer Exception
при запуске на DataflowRunner
. Исключение составляет следующее:
java.lang.NullPointerException
at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn.executeBatch(JdbcIO.java:775)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Write$WriteFn.finishBundle(JdbcIO.java:755)
Beam Version = 2.16.0, 2.15.0 - пробовал обе версии, но не удалось. Любая причина, почему это происходит? Как заставить его работать с DataflowRunner
?