メインコンテンツまでスキップ
バージョン: 26.x

Spark Doris Connector

Spark Doris Connectorは、Sparkを通じてDorisに保存されているデータの読み取りと、Dorisへのデータ書き込みをサポートできます。

Github: https://github.com/apache/doris-spark-connector

  • RDDDataFrame、およびSpark SQLを通じてDorisからバッチモードでのデータ読み取りをサポート。DataFrameまたはSpark SQLの使用を推奨
  • DataFrame APIおよびSpark SQLを使用したバッチモードまたはストリーミングモードでのDorisへのデータ書き込みをサポート
  • DorisテーブルをDataFrameまたはRDDにマッピング可能。DataFrameの使用を推奨
  • Doris側でのデータフィルタリングの完了をサポートし、データ転送量を削減

バージョン互換性

ConnectorSparkDorisJavaScala
25.1.03.5 - 3.1, 2.41.0 +82.12, 2.11
25.0.13.5 - 3.1, 2.41.0 +82.12, 2.11
25.0.03.5 - 3.1, 2.41.0 +82.12, 2.11
24.0.03.5 ~ 3.1, 2.41.0 +82.12, 2.11
1.3.23.4 ~ 3.1, 2.4, 2.31.0 ~ 2.1.682.12, 2.11
1.3.13.4 ~ 3.1, 2.4, 2.31.0 ~ 2.1.082.12, 2.11
1.3.03.4 ~ 3.1, 2.4, 2.31.0 ~ 2.1.082.12, 2.11
1.2.03.2, 3.1, 2.31.0 ~ 2.0.282.12, 2.11
1.1.03.2, 3.1, 2.31.0 ~ 1.2.882.12, 2.11
1.0.13.1, 2.30.12 - 0.1582.12, 2.11

使用方法

Maven

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-spark-3.5</artifactId>
<version>25.1.0</version>
</dependency>

::: tip

バージョン24.0.0から、Doris connectorパッケージの命名規則が調整されました:

  1. Scalaバージョン情報を含まなくなりました。
  2. Spark 2.xバージョンでは、spark-doris-connector-spark-2という名前のパッケージを統一して使用し、デフォルトではScala 2.11バージョンのみをベースにコンパイルします。Scala 2.12バージョンが必要な場合は、自分でコンパイルしてください。
  3. Spark 3.xバージョンでは、特定のSparkバージョンに応じてspark-doris-connector-spark-3.xという名前のパッケージを使用します。Spark 3.0バージョンベースのアプリケーションは、パッケージspark-doris-connector-spark-3.1を使用できます。

:::

注意

  1. 異なるSparkおよびScalaバージョンに応じて、対応するConnectorバージョンを置き換えてください。

  2. こちらから関連バージョンのjarパッケージをダウンロードすることもできます。

コンパイル

コンパイル時には、直接sh build.shを実行できます。詳細については、こちらを参照してください。

コンパイル成功後、distディレクトリにターゲットjarパッケージが生成されます(例:spark-doris-connector-spark-3.5-25.1.0.jar)。このファイルをSparkClassPathにコピーしてSpark-Doris-Connectorを使用します。例えば、Localモードで実行しているSparkの場合、このファイルをjars/フォルダに置きます。Yarnクラスターモードで実行しているSparkの場合、このファイルを事前デプロイパッケージに置きます。 次のこともできます

ソースコードディレクトリで実行:

sh build.sh

プロンプトに従って、コンパイルが必要なScalaおよびSparkバージョンを入力します。

コンパイル成功後、distディレクトリにターゲットjarパッケージが生成されます(例:spark-doris-connector-spark-3.5-25.1.0.jar)。 このファイルをSparkClassPathにコピーしてSpark-Doris-Connectorを使用します。

例えば、SparkLocalモードで実行している場合、このファイルをjars/フォルダに置きます。SparkYarnクラスターモードで実行している場合、このファイルを事前デプロイパッケージに置きます。

例えば、spark-doris-connector-spark-3.5-25.1.0.jarをhdfsにアップロードし、hdfs上のJarパッケージパスをspark.yarn.jarsパラメータに追加します


1. Upload `spark-doris-connector-spark-3.5-25.1.0.jar` to hdfs.

hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-25.1.0.jar /spark-jars/

2. Add the `spark-doris-connector-spark-3.5-25.1.0.jar` dependency in the cluster.
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-25.1.0.jar

バッチ読み取り

RDD

import org.apache.doris.spark._

val dorisSparkRDD = sc.dorisRDD(
tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
cfg = Some(Map(
"doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
"doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
))
)

dorisSparkRDD.collect()

DataFrame

val dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()

dorisSparkDF.show(5)

Spark SQL

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
);

SELECT * FROM spark_doris;

pySpark

dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()
// show 5 lines data
dorisSparkDF.show(5)

Arrow Flight SQL による読み取り

バージョン 24.0.0 以降では、Arrow Flight SQL 経由でデータを読み取ることができます(Doris バージョン >= 2.1.0 が必要です)。

doris.read.mode を arrow に設定し、doris.read.arrow-flight-sql.port を FE で設定された Arrow Flight SQL ポートに設定してください。

サーバー設定については、Arrow Flight SQL に基づく高速データ転送リンク を参照してください。

val df = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("doris.user", "$YOUR_DORIS_USERNAME")
.option("doris.password", "$YOUR_DORIS_PASSWORD")
.option("doris.read.mode", "arrow")
.option("doris.read.arrow-flight-sql.port", "12345")
.load()

df.show()

バッチ書き込み

DataFrame

val mockDataDF = List(
(3, "440403001005", "21.cn"),
(1, "4404030013005", "22.cn"),
(33, null, "23.cn")
).toDF("id", "mi_code", "mi_name")
mockDataDF.show(5)

mockDataDF.write.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
//other options
//specify the fields to write
.option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE")
// Support setting Overwrite mode to overwrite data
// .mode(SaveMode.Overwrite)
.save()

Spark SQL

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
);

INSERT INTO spark_doris VALUES ("VALUE1", "VALUE2", ...);
-- insert into select
INSERT INTO spark_doris SELECT * FROM YOUR_TABLE;
-- insert overwrite
INSERT OVERWRITE SELECT * FROM YOUR_TABLE;

ストリーミング書き込み

DataFrame

構造化データの書き込み
val df = spark.readStream.format("your_own_stream_source").load()

df.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.start()
.awaitTermination()
直接書き込み

データストリーム内のデータの最初の列がDorisテーブル構造に準拠したフォーマット済みデータである場合、例えば同じ列順序のCSVフォーマットデータや同じフィールド名のJSONフォーマットデータなど、doris.sink.streaming.passthroughオプションをtrueに設定することでDataFrameに変換することなく直接Dorisに書き込むことができます。

kafkaを例に取ります。

そして、書き込まれるテーブル構造を以下のように想定します:

CREATE TABLE `t2` (
`c0` int NULL,
`c1` varchar(10) NULL,
`c2` date NULL
) ENGINE=OLAP
DUPLICATE KEY(`c0`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`c0`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

メッセージの値は{"c0":1,"c1":"a","dt":"2024-01-01"}のjson形式です。

val kafkaSource = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
.option("startingOffsets", "latest")
.option("subscribe", "$YOUR_KAFKA_TOPICS")
.load()

// Select the value of the message as the first column of the DataFrame.
kafkaSource.selectExpr("CAST(value as STRING)")
.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
// Set this option to true, and the first column will be written directly without processing.
.option("doris.sink.streaming.passthrough", "true")
.option("doris.sink.properties.format", "json")
.start()
.awaitTermination()

JSON形式で書き込み

doris.sink.properties.formatをjsonに設定

val df = spark.readStream.format("your_own_stream_source").load() 
df.write.format("doris")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.option("doris.sink.properties.format", "json")
.save()

Spark Doris Catalog

バージョン24.0.0以降、Spark Catalogを通じてDorisへのアクセスをサポートしています。

Catalog設定

キー必須コメント
spark.sql.catalog.your_catalog_nametruecatalogプロバイダーのクラス名を設定します。Dorisの場合、唯一の有効な値はorg.apache.doris.spark.catalog.DorisTableCatalogです
spark.sql.catalog.your_catalog_name.doris.fenodestrueDoris FEノードをfe_ip:fe_http_portの形式で設定します
spark.sql.catalog.your_catalog_name.doris.query.portfalseDoris FEクエリポートを設定します。spark.sql.catalog.your_catalog_name.doris.fe.auto.fetchがtrueに設定されている場合、このオプションは不要です
spark.sql.catalog.your_catalog_name.doris.usertrueDorisユーザーを設定します
spark.sql.catalog.your_catalog_name.doris.passwordtrueDorisパスワードを設定します
spark.sql.defaultCatalogfalseSpark SQLデフォルトcatalogを設定します
ヒント

DataFrameとSpark SQLに適用されるすべてのコネクターパラメーターをcatalogに設定することができます。
例えば、データをjson形式で書き込みたい場合は、オプションspark.sql.catalog.your_catalog_name.doris.sink.properties.formatjsonに設定できます。

DataFrame

val conf = new SparkConf()
conf.set("spark.sql.catalog.your_catalog_name", "org.apache.doris.spark.catalog.DorisTableCatalog")
conf.set("spark.sql.catalog.your_catalog_name.doris.fenodes", "192.168.0.1:8030")
conf.set("spark.sql.catalog.your_catalog_name.doris.query.port", "9030")
conf.set("spark.sql.catalog.your_catalog_name.doris.user", "root")
conf.set("spark.sql.catalog.your_catalog_name.doris.password", "")
val spark = builder.config(conf).getOrCreate()
spark.sessionState.catalogManager.setCurrentCatalog("your_catalog_name")

// show all databases
spark.sql("show databases")

// use databases
spark.sql("use your_doris_db")

// show tables in test
spark.sql("show tables")

// query table
spark.sql("select * from your_doris_table")

// write data
spark.sql("insert into your_doris_table values(xxx)")

Spark SQL

必要な設定でSpark SQL CLIを開始します。

spark-sql \
--conf "spark.sql.catalog.your_catalog_name=org.apache.doris.spark.catalog.DorisTableCatalog" \
--conf "spark.sql.catalog.your_catalog_name.doris.fenodes=192.168.0.1:8030" \
--conf "spark.sql.catalog.your_catalog_name.doris.query.port=9030" \
--conf "spark.sql.catalog.your_catalog_name.doris.user=root" \
--conf "spark.sql.catalog.your_catalog_name.doris.password=" \
--conf "spark.sql.defaultCatalog=your_catalog_name"

Spark SQL CLIでクエリを実行します。

-- show all databases
show databases;

-- use databases
use your_doris_db;

-- show tables in test
show tables;

-- query table
select * from your_doris_table;

-- write data
insert into your_doris_table values(xxx);
insert into your_doris_table select * from your_source_table;

-- access table with full name
select * from your_catalog_name.your_doris_db.your_doris_table;
insert into your_catalog_name.your_doris_db.your_doris_table values(xxx);
insert into your_catalog_name.your_doris_db.your_doris_table select * from your_source_table;

設定

一般

KeyDefault ValueComment
doris.fenodes--Doris FE httpアドレス、複数のアドレスをサポート、カンマ区切り
doris.table.identifier--Dorisテーブル識別子、例:db1.tbl1
doris.user--Dorisユーザー名
doris.passwordEmpty stringDorisパスワード
doris.request.retries3Dorisにリクエストを送信するリトライ回数
doris.request.connect.timeout.ms30000Dorisにリクエストを送信する際の接続タイムアウト
doris.request.read.timeout.ms30000Dorisにリクエストを送信する際の読み取りタイムアウト
doris.request.query.timeout.s21600dorisのクエリタイムアウト時間、デフォルトは6時間、-1はタイムアウト制限なしを意味する
doris.request.tablet.size1一つのRDD Partitionに対応するDoris Tabletの数。この値を小さく設定するほど、より多くのパーティションが生成される。これによりSpark側の並列性が向上するが、同時にDorisにより大きな負荷を与える。
doris.read.field--Dorisテーブル内のカラム名リスト、カンマ区切り
doris.batch.size4064BEから一度に読み取るデータの最大行数。この値を増やすとSparkとDoris間の接続数を削減できる。これによりネットワーク遅延による追加の時間オーバーヘッドを削減できる。
doris.exec.mem.limit8589934592単一クエリのメモリ制限。デフォルトは8GB、バイト単位。
doris.write.fields--Dorisテーブルに書き込むフィールド(またはフィールドの順序)を指定、フィールドはカンマ区切り。
デフォルトでは、すべてのフィールドがDorisテーブルフィールドの順序で書き込まれる。
doris.sink.batch.size500000単一のBE書き込みでの最大行数
doris.sink.max-retries0BE書き込み後のリトライ回数、バージョン1.3.0以降、デフォルト値は0で、デフォルトではリトライは実行されない。このパラメータが0より大きく設定された場合、バッチレベルの失敗リトライが実行され、doris.sink.batch.sizeで設定されたサイズのデータがSpark Executorメモリにキャッシュされる。メモリ割り当てを適切に増やす必要がある場合がある。
doris.sink.retry.interval.ms10000リトライ回数を設定した後の各リトライ間隔、ミリ秒単位
doris.sink.properties.format--stream loadのデータ形式。
サポート形式:csv、json、arrow
詳細なマルチパラメータの詳細
doris.sink.properties.*--Stream Loadのインポートパラメータ。
例:
カラム区切り文字を指定:'doris.sink.properties.column_separator' = ','
パラメータの詳細
doris.sink.task.partition.size--書き込みタスクに対応するパーティション数。フィルタリングおよびその他の操作後、Spark RDDで書き込まれるパーティション数は多くなる可能性があるが、各Partitionに対応するレコード数は比較的少なく、書き込み頻度の増加と計算リソースの無駄を招く。この値を小さく設定するほど、Doris書き込み頻度が少なくなり、Dorisマージ圧力が小さくなる。一般的にdoris.sink.task.use.repartitionと併用される。
doris.sink.task.use.repartitionfalseDorisが書き込むパーティション数を制御するためにrepartitionモードを使用するかどうか。デフォルト値はfalseで、coalesceが使用される(注意:書き込み前にSparkアクションがない場合、全体の計算の並列性が低下する)。trueに設定された場合、repartitionが使用される(注意:シャッフルのコストで最終的なパーティション数を設定できる)。
doris.sink.batch.interval.ms0各バッチsinkの間隔時間、ミリ秒単位。
doris.sink.enable-2pcfalse2段階コミットを有効にするかどうか。有効にすると、トランザクションはジョブの終了時にコミットされ、一部のタスクが失敗した場合、すべてのプリコミットトランザクションがロールバックされる。
doris.sink.auto-redirecttrueStreamLoadリクエストをリダイレクトするかどうか。有効にすると、StreamLoadはFEを通じて書き込みを行い、BE情報を明示的に取得しなくなる。
doris.enable.httpsfalseFE Httpsリクエストを有効にするかどうか。
doris.https.key-store-path-Httpsキーストアパス。
doris.https.key-store-typeJKSHttpsキーストアタイプ。
doris.https.key-store-password-Httpsキーストアパスワード。
doris.read.modethriftDoris読み取りモード、オプションはthriftarrow
doris.read.arrow-flight-sql.port-Doris FEのArrow Flight SQLポート。doris.read.modearrowの場合、Arrow Flight SQLを介してデータを読み取るために使用される。サーバー設定については、Arrow Flight SQLベースの高速データ転送リンクを参照
doris.sink.label.prefixspark-dorisStream Loadモードで書き込む際のインポートラベルプレフィックス。
doris.thrift.max.message.size2147483647Thriftを介してデータを読み取る際のメッセージの最大サイズ。
doris.fe.auto.fetchfalseFE情報を自動取得するかどうか。trueに設定すると、doris.fenodesで設定されたノードに従ってすべてのFEノード情報がリクエストされる。複数のノードを設定したり、doris.read.arrow-flight-sql.portdoris.query.portを個別に設定したりする必要はない。
doris.read.bitmap-to-stringfalseBitmapタイプを読み取り用に配列インデックスで構成された文字列に変換するかどうか。具体的な結果形式については、関数定義BITMAP_TO_STRINGを参照。
doris.read.bitmap-to-base64falseBitmapタイプを読み取り用にBase64エンコードされた文字列に変換するかどうか。具体的な結果形式については、関数定義BITMAP_TO_BASE64を参照。
doris.query.port-Doris FEクエリポート、Catalogのメタデータの上書きと取得に使用される。

SQL & Dataframe設定

KeyDefault ValueComment
doris.filter.query.in.max.count100述語プッシュダウンにおいて、in式の値リスト内の要素の最大数。この数を超えた場合、in式の条件フィルタリングはSpark側で処理される。

Structured Streaming設定

KeyDefault ValueComment
doris.sink.streaming.passthroughfalse処理せずに最初のカラムの値を直接書き込む。

RDD設定

KeyDefault ValueComment
doris.request.auth.user--Dorisユーザー名
doris.request.auth.password--Dorisパスワード
doris.filter.query--クエリのフィルタ式、Dorisに透過的に送信される。Dorisはこの式を使用してソース側データフィルタリングを完了する。

Doris & Sparkカラムタイプマッピング

DorisタイプSparkタイプ
NULL_TYPEDataTypes.NullType
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DATEDataTypes.DateType
DATETIMEDataTypes.TimestampType
DECIMALDecimalType
CHARDataTypes.StringType
LARGEINTDecimalType
VARCHARDataTypes.StringType
STRINGDataTypes.StringType
JSONDataTypes.StringType
VARIANTDataTypes.StringType
TIMEDataTypes.DoubleType
HLLDataTypes.StringType
BitmapDataTypes.StringType
ヒント

バージョン24.0.0以降、Bitmapタイプの戻り値タイプはstringタイプで、デフォルトの戻り値は文字列値Read unsupportedです。

FAQ

  1. Bitmapタイプの書き込み方法

    Spark SQLでinsert intoを通じてデータを書き込む際、dorisのターゲットテーブルにBITMAPまたはHLLタイプのデータが含まれている場合、オプションdoris.ignore-typeを対応するタイプに設定し、doris.write.fieldsを通じてカラムをマップする必要があります。使用方法は以下の通りです:

    BITMAP

    CREATE TEMPORARY VIEW spark_doris
    USING doris
    OPTIONS(
    "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
    "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
    "user"="$YOUR_DORIS_USERNAME",
    "password"="$YOUR_DORIS_PASSWORD"
    "doris.ignore-type"="bitmap",
    "doris.write.fields"="col1,col2,col3,bitmap_col2=to_bitmap(col2),bitmap_col3=bitmap_hash(col3)"
    );

HLL

```sql
CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
"doris.ignore-type"="hll",
"doris.write.fields"="col1,hll_col1=hll_hash(col1)"
);
```
ヒント

バージョン24.0.0以降、doris.ignore-typeは非推奨となり、書き込み時にこのパラメータを追加する必要はありません。

  1. overwriteを使用して書き込む方法

    バージョン1.3.0以降、overwriteモードでの書き込みがサポートされています(全テーブルレベルでのデータ上書きのみサポート)。具体的な使用方法は以下の通りです:

    DataFrame

    resultDf.format("doris")
    .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
    // your own options
    .mode(SaveMode.Overwrite)
    .save()

SQL

```sparksql
INSERT OVERWRITE your_target_table SELECT * FROM your_source_table
```

3. Bitmap型の読み取り方法

バージョン24.0.0以降、Arrow Flight SQLを通じて変換されたBitmapデータの読み取りをサポートしています(Dorisバージョン >= 2.1.0が必要)。

Bitmapから文字列へ

DataFrameの例は以下の通りです。doris.read.bitmap-to-stringをtrueに設定してください。具体的な結果形式については、オプション定義を参照してください。

spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.option("doris.read.bitmap-to-string","true")
.load()

Bitmap to base64

DataFrame の例は以下の通りです。doris.read.bitmap-to-base64 を true に設定してください。具体的な結果フォーマットについては、オプション定義を参照してください。

spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.option("doris.read.bitmap-to-base64","true")
.load()
  1. DataFrame モードでの書き込み時にエラーが発生する: org.apache.spark.sql.AnalysisException: TableProvider implementation doris cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead.

    保存モードをappendに追加する必要があります。

    resultDf.format("doris")
    .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
    // your own options
    .mode(SaveMode.Append)
    .save()