メインコンテンツまでスキップ
バージョン: 2.1

Spark Doris Connector

Spark Doris ConnectorはSparkを通じてDorisに格納されたデータの読み取りとDorisへのデータ書き込みをサポートできます。

Github: https://github.com/apache/doris-spark-connector

  • RDDDataFrameSpark SQLを通じてDorisからバッチモードでのデータ読み取りをサポート。DataFrameまたはSpark SQLの使用を推奨
  • DataFrame APIとSpark SQLを使用してバッチまたはストリーミングモードでDorisへのデータ書き込みをサポート
  • DorisTableをDataFrameまたはRDDにマップ可能、DataFrameの使用を推奨
  • Doris側でのデータフィルタリングの完了をサポートし、データ転送量を削減

バージョン互換性

ConnectorSparkDorisJavaScala
25.2.03.5 - 3.1, 2.41.0 +82.12, 2.11
25.1.03.5 - 3.1, 2.41.0 +82.12, 2.11
25.0.13.5 - 3.1, 2.41.0 +82.12, 2.11
25.0.03.5 - 3.1, 2.41.0 +82.12, 2.11
24.0.03.5 ~ 3.1, 2.41.0 +82.12, 2.11
1.3.23.4 ~ 3.1, 2.4, 2.31.0 ~ 2.1.682.12, 2.11
1.3.13.4 ~ 3.1, 2.4, 2.31.0 ~ 2.1.082.12, 2.11
1.3.03.4 ~ 3.1, 2.4, 2.31.0 ~ 2.1.082.12, 2.11
1.2.03.2, 3.1, 2.31.0 ~ 2.0.282.12, 2.11
1.1.03.2, 3.1, 2.31.0 ~ 1.2.882.12, 2.11
1.0.13.1, 2.30.12 - 0.1582.12, 2.11

使用方法

Maven

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-spark-3.5</artifactId>
<version>25.2.0</version>
</dependency>

::: tip

バージョン24.0.0以降、Dorisコネクタパッケージの命名規則が調整されました:

  1. Scalaバージョン情報を含まなくなりました。
  2. Spark 2.xバージョンについては、spark-doris-connector-spark-2という名前のパッケージを統一的に使用し、デフォルトではScala 2.11バージョンに基づいてのみコンパイルします。Scala 2.12バージョンが必要な場合は、自分でコンパイルしてください。
  3. Spark 3.xバージョンについては、特定のSparkバージョンに応じてspark-doris-connector-spark-3.xという名前のパッケージを使用します。Spark 3.0バージョンに基づくアプリケーションはspark-doris-connector-spark-3.1パッケージを使用できます。

:::

注意

  1. 異なるSparkおよびScalaバージョンに応じて、対応するConnectorバージョンを置き換えてください。

  2. こちらから関連するバージョンのjarパッケージをダウンロードすることもできます。

コンパイル

コンパイル時は、直接sh build.shを実行できます。詳細については、こちらを参照してください。

コンパイルが成功すると、ターゲットjarパッケージがdistディレクトリに生成されます(例:spark-doris-connector-spark-3.5-25.2.0.jar)。このファイルをSparkClassPathにコピーしてSpark-Doris-Connectorを使用します。例えば、SparkLocalモードで実行されている場合は、このファイルをjars/フォルダに配置します。SparkYarnクラスタモードで実行されている場合は、このファイルを事前デプロイパッケージに配置します。 また、以下も可能です

ソースコードディレクトリで実行:

sh build.sh

プロンプトに従って、コンパイルが必要なScalaおよびSparkバージョンを入力します。

コンパイルが成功すると、ターゲットjarパッケージがdistディレクトリに生成されます(例:spark-doris-connector-spark-3.5-25.2.0.jar)。 このファイルをSparkClassPathにコピーしてSpark-Doris-Connectorを使用します。

例えば、SparkLocalモードで実行されている場合は、このファイルをjars/フォルダに配置します。SparkYarnクラスタモードで実行されている場合は、このファイルを事前デプロイパッケージに配置します。

例えば、spark-doris-connector-spark-3.5-25.2.0.jarをhdfsにアップロードし、hdfs上のJarパッケージパスをspark.yarn.jarsパラメータに追加します


1. Upload `spark-doris-connector-spark-3.5-25.2.0.jar` to hdfs.

hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-25.2.0.jar /spark-jars/

2. Add the `spark-doris-connector-spark-3.5-25.2.0.jar` dependency in the cluster.
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-25.2.0.jar

Example

バッチ読み取り

RDD

import org.apache.doris.spark._

val dorisSparkRDD = sc.dorisRDD(
tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
cfg = Some(Map(
"doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
"doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
))
)

dorisSparkRDD.collect()

DataFrame

val dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()

dorisSparkDF.show(5)

Spark SQL

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
);

SELECT * FROM spark_doris;

pySpark

dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()
// show 5 lines data
dorisSparkDF.show(5)

Arrow Flight SQLによる読み取り

バージョン24.0.0以降、Arrow Flight SQLを介してデータを読み取ることができます(Dorisバージョン2.1.0以上が必要です)。

doris.read.modeをarrowに設定し、doris.read.arrow-flight-sql.portをFEによって設定されたArrow Flight SQLポートに設定してください。

サーバー設定については、Arrow Flight SQLに基づく高速データ伝送リンクを参照してください。

val df = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("doris.user", "$YOUR_DORIS_USERNAME")
.option("doris.password", "$YOUR_DORIS_PASSWORD")
.option("doris.read.mode", "arrow")
.option("doris.read.arrow-flight-sql.port", "12345")
.load()

df.show()

Batch Write

DataFrame

val mockDataDF = List(
(3, "440403001005", "21.cn"),
(1, "4404030013005", "22.cn"),
(33, null, "23.cn")
).toDF("id", "mi_code", "mi_name")
mockDataDF.show(5)

mockDataDF.write.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
//other options
//specify the fields to write
.option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE")
// Support setting Overwrite mode to overwrite data
// .mode(SaveMode.Overwrite)
.save()

Spark SQL

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
);

INSERT INTO spark_doris VALUES ("VALUE1", "VALUE2", ...);
-- insert into select
INSERT INTO spark_doris SELECT * FROM YOUR_TABLE;
-- insert overwrite
INSERT OVERWRITE SELECT * FROM YOUR_TABLE;

Streaming Write

DataFrame

構造化データの書き込み
val df = spark.readStream.format("your_own_stream_source").load()

df.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.start()
.awaitTermination()
直接書き込み

データストリームの最初のデータ列がDorisTable構造に準拠したフォーマット済みデータである場合、例えば同じ列順序のCSVフォーマットデータや同じフィールド名のJSONフォーマットデータの場合、doris.sink.streaming.passthroughオプションをtrueに設定することで、DataFrameに変換することなく直接Dorisに書き込むことができます。

kafkaを例にとります。

そして、書き込み対象のTable構造が以下であると仮定します:

CREATE TABLE `t2` (
`c0` int NULL,
`c1` varchar(10) NULL,
`c2` date NULL
) ENGINE=OLAP
DUPLICATE KEY(`c0`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`c0`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

メッセージの値は{"c0":1,"c1":"a","dt":"2024-01-01"}のjson形式です。

val kafkaSource = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
.option("startingOffsets", "latest")
.option("subscribe", "$YOUR_KAFKA_TOPICS")
.load()

// Select the value of the message as the first column of the DataFrame.
kafkaSource.selectExpr("CAST(value as STRING)")
.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
// Set this option to true, and the first column will be written directly without processing.
.option("doris.sink.streaming.passthrough", "true")
.option("doris.sink.properties.format", "json")
.start()
.awaitTermination()

JSON形式で書き込む

doris.sink.properties.formatをjsonに設定する

val df = spark.readStream.format("your_own_stream_source").load() 
df.write.format("doris")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.option("doris.sink.properties.format", "json")
.save()

Spark Doris カタログ

バージョン24.0.0以降、Spark Catalogを通じてDorisへのアクセスをサポートしています。

カタログ Config

KeyRequiredComment
spark.sql.catalog.your_catalog_nametruecatalogプロバイダーのクラス名を設定します。Dorisで有効な値はorg.apache.doris.spark.catalog.DorisTableCatalogのみです
spark.sql.catalog.your_catalog_name.doris.fenodestruefe_ip:fe_http_portの形式でDoris FEノードを設定します
spark.sql.catalog.your_catalog_name.doris.query.portfalseDoris FEクエリポートを設定します。spark.sql.catalog.your_catalog_name.doris.fe.auto.fetchがtrueに設定されている場合、このオプションは不要です
spark.sql.catalog.your_catalog_name.doris.usertrueDorisユーザーを設定します
spark.sql.catalog.your_catalog_name.doris.passwordtrueDorisパスワードを設定します
spark.sql.defaultCatalogfalseSpark SQLデフォルトcatalogを設定します
ヒント

DataFrameとSpark SQLに適用されるすべてのコネクタパラメータをcatalogに設定できます。
例えば、json形式でデータを書き込みたい場合は、オプションspark.sql.catalog.your_catalog_name.doris.sink.properties.formatjsonに設定できます。

DataFrame

val conf = new SparkConf()
conf.set("spark.sql.catalog.your_catalog_name", "org.apache.doris.spark.catalog.DorisTableCatalog")
conf.set("spark.sql.catalog.your_catalog_name.doris.fenodes", "192.168.0.1:8030")
conf.set("spark.sql.catalog.your_catalog_name.doris.query.port", "9030")
conf.set("spark.sql.catalog.your_catalog_name.doris.user", "root")
conf.set("spark.sql.catalog.your_catalog_name.doris.password", "")
val spark = builder.config(conf).getOrCreate()
spark.sessionState.catalogManager.setCurrentCatalog("your_catalog_name")

// show all databases
spark.sql("show databases")

// use databases
spark.sql("use your_doris_db")

// show tables in test
spark.sql("show tables")

// query table
spark.sql("select * from your_doris_table")

// write data
spark.sql("insert into your_doris_table values(xxx)")

Spark SQL

必要な設定でSpark SQL CLIを開始します。

spark-sql \
--conf "spark.sql.catalog.your_catalog_name=org.apache.doris.spark.catalog.DorisTableCatalog" \
--conf "spark.sql.catalog.your_catalog_name.doris.fenodes=192.168.0.1:8030" \
--conf "spark.sql.catalog.your_catalog_name.doris.query.port=9030" \
--conf "spark.sql.catalog.your_catalog_name.doris.user=root" \
--conf "spark.sql.catalog.your_catalog_name.doris.password=" \
--conf "spark.sql.defaultCatalog=your_catalog_name"

Spark SQL CLIでクエリを実行します。

-- show all databases
show databases;

-- use databases
use your_doris_db;

-- show tables in test
show tables;

-- query table
select * from your_doris_table;

-- write data
insert into your_doris_table values(xxx);
insert into your_doris_table select * from your_source_table;

-- access table with full name
select * from your_catalog_name.your_doris_db.your_doris_table;
insert into your_catalog_name.your_doris_db.your_doris_table values(xxx);
insert into your_catalog_name.your_doris_db.your_doris_table select * from your_source_table;

構成

General

Keyデフォルト値Comment
doris.fenodes--Doris FE httpアドレス、複数のアドレスをサポート、カンマで区切り
doris.table.identifier--DorisTable識別子、例:db1.tbl1
doris.user--Dorisユーザー名
doris.password空文字列Dorisパスワード
doris.request.retries3Dorisへのリクエスト送信の再試行回数
doris.request.connect.timeout.ms30000Dorisへのリクエスト送信時の接続タイムアウト
doris.request.read.timeout.ms30000Dorisへのリクエスト送信時の読み取りタイムアウト
doris.request.query.timeout.s21600dorisクエリのタイムアウト時間、デフォルトは6時間、-1はタイムアウト制限なしを意味する
doris.request.tablet.size1RDD Partitionに対応するDoris Tabletの数。この値を小さく設定するほど、より多くのパーティションが生成される。これによりSpark側の並列性が向上するが、同時にDorisにより大きな負荷をかけることになる。
doris.read.field--DorisTableの列名リスト、カンマで区切り
doris.batch.size4064一度にBEからデータを読み取る最大行数。この値を増やすと、SparkとDoris間の接続数を減らすことができる。これによりネットワーク遅延による余分な時間オーバーヘッドを削減できる。
doris.exec.mem.limit8589934592単一クエリのメモリ制限。デフォルトは8GB、バイト単位。
doris.write.fields--DorisTableに書き込むフィールド(またはフィールドの順序)を指定、フィールドはカンマで区切り。
デフォルトでは、すべてのフィールドがDorisTableフィールドの順序で書き込まれる。
doris.sink.batch.size500000単一のBE書き込みでの最大行数
doris.sink.max-retries0BE書き込み後の再試行回数。バージョン1.3.0以降、デフォルト値は0で、デフォルトで再試行を実行しないことを意味する。このパラメータが0より大きく設定された場合、バッチレベルの失敗再試行が実行され、doris.sink.batch.sizeの設定サイズのデータがSpark Executorメモリにキャッシュされる。メモリ割り当てを適切に増やす必要がある場合がある。
doris.sink.retry.interval.ms10000再試行回数設定後の各再試行間の間隔、ミリ秒単位
doris.sink.properties.format--stream loadのデータ形式。
サポートされる形式:csv、json、arrow
その他のマルチパラメータの詳細
doris.sink.properties.*--Stream Loadのインポートパラメータ。
例:
列区切り文字の指定:'doris.sink.properties.column_separator' = ','
その他のパラメータの詳細
doris.sink.task.partition.size--書き込みタスクに対応するパーティション数。フィルタリングやその他の操作後、Spark RDDで書き込まれるパーティション数は多くなる可能性があるが、各Partitionに対応するレコード数は比較的少なく、書き込み頻度の増加とコンピューティングリソースの浪費を引き起こす。この値を小さく設定するほど、Doris書き込み頻度が減り、Dorisマージ圧力も少なくなる。一般的にdoris.sink.task.use.repartitionと組み合わせて使用する。
doris.sink.task.use.repartitionfalserepartitionモードを使用してDorisが書き込むパーティション数を制御するかどうか。デフォルト値はfalseで、coalesceが使用される(注意:書き込み前にSparkアクションがない場合、全体の計算の並列性が低くなる)。trueに設定された場合、repartitionが使用される(注意:shuffleのコストで最終パーティション数を設定できる)。
doris.sink.batch.interval.ms0各バッチsinkの間隔時間、ミリ秒単位。
doris.sink.enable-2pcfalse2段階コミットを有効にするかどうか。有効にすると、ジョブ終了時にトランザクションがコミットされ、一部のタスクが失敗した場合はすべてのプリコミットトランザクションがロールバックされる。
doris.sink.auto-redirecttrueStreamLoadリクエストをリダイレクトするかどうか。有効にすると、StreamLoadはFEを通じて書き込みを行い、BE情報を明示的に取得しなくなる。
doris.enable.httpsfalseFE Httpsリクエストを有効にするかどうか。
doris.https.key-store-path-Httpsキーストアパス。
doris.https.key-store-typeJKSHttpsキーストアタイプ。
doris.https.key-store-password-Httpsキーストアパスワード。
doris.read.modethriftDoris読み取りモード、thriftarrowがオプション。
doris.read.arrow-flight-sql.port-Doris FEのArrow Flight SQLポート。doris.read.modearrowの場合、Arrow Flight SQL経由でデータを読み取るために使用される。サーバー設定については、Arrow Flight SQLベースの高速データ伝送リンクを参照
doris.sink.label.prefixspark-dorisStream Loadモードで書き込み時のインポートラベルプレフィックス。
doris.thrift.max.message.size2147483647Thrift経由でデータを読み取る際のメッセージの最大サイズ。
doris.fe.auto.fetchfalseFE情報を自動取得するかどうか。trueに設定すると、doris.fenodesで設定されたノードに従ってすべてのFEノード情報が要求される。複数ノードを設定する必要がなく、doris.read.arrow-flight-sql.portdoris.query.portを個別に設定する必要もない。
doris.read.bitmap-to-stringfalseBitmapタイプを配列インデックスで構成された文字列に変換して読み取るかどうか。具体的な結果形式については、関数定義BITMAP_TO_STRINGを参照。
doris.read.bitmap-to-base64falseBitmapタイプをBase64エンコード文字列に変換して読み取るかどうか。具体的な結果形式については、関数定義BITMAP_TO_BASE64を参照。
doris.query.port-Doris FEクエリポート、Catalogの上書きとメタデータ取得に使用。

SQL & Dataframe 構成

Keyデフォルト値Comment
doris.filter.query.in.max.count100述語プッシュダウンにおいて、in式の値リストの要素の最大数。この数を超える場合、in式条件フィルタリングはSpark側で処理される。

Structured Streaming 構成

Keyデフォルト値Comment
doris.sink.streaming.passthroughfalse処理を行わずに最初の列の値を直接書き込む。

RDD 構成

Keyデフォルト値Comment
doris.request.auth.user--Dorisユーザー名
doris.request.auth.password--Dorisパスワード
doris.filter.query--クエリのフィルタ式、Dorisに透過的に送信される。Dorisはこの式を使用してソース側データフィルタリングを完了する。

DorisからSparkへのデータ型マッピング

Doris タイプSpark タイプ
NULL_TYPEDataTypes.NullType
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DATEDataTypes.DateType
DATETIMEDataTypes.TimestampType
DECIMALDecimalType
CHARDataTypes.StringType
LARGEINTDecimalType
VARCHARDataTypes.StringType
STRINGDataTypes.StringType
JSONDataTypes.StringType
VARIANTDataTypes.StringType
TIMEDataTypes.DoubleType
HLLDataTypes.StringType
BitmapDataTypes.StringType

SparkからDorisへのデータ型マッピング

Spark タイプDoris タイプ
BooleanTypeBOOLEAN
ShortTypeSMALLINT
IntegerTypeINT
LongTypeBIGINT
FloatTypeFLOAT
DoubleTypeDOUBLE
DecimalTypeDECIMAL
StringTypeVARCHAR/STRING
DateTypeDATE
TimestampTypeDATETIME
ArrayTypeARRAY
MapTypeMAP/JSON
StructTypeSTRUCT/JSON
ヒント

バージョン24.0.0以降、Bitmapタイプの戻り値の型はstring型で、デフォルトの戻り値は文字列値Read unsupportedです。

FAQ

  1. Bitmapタイプの書き込み方法

    Spark SQLでinsert intoを通じてデータを書き込む際、dorisのターゲットTableにBITMAPまたはHLLタイプのデータが含まれている場合、オプションdoris.ignore-typeを対応するタイプに設定し、doris.write.fieldsを通じて列をマッピングする必要があります。使用方法は以下のとおりです:

    BITMAP

    CREATE TEMPORARY VIEW spark_doris
    USING doris
    OPTIONS(
    "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
    "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
    "user"="$YOUR_DORIS_USERNAME",
    "password"="$YOUR_DORIS_PASSWORD"
    "doris.ignore-type"="bitmap",
    "doris.write.fields"="col1,col2,col3,bitmap_col2=to_bitmap(col2),bitmap_col3=bitmap_hash(col3)"
    );

HLL

```sql
CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
"doris.ignore-type"="hll",
"doris.write.fields"="col1,hll_col1=hll_hash(col1)"
);
```
ヒント

バージョン24.0.0以降、doris.ignore-typeは非推奨となり、書き込み時にこのパラメータを追加する必要はありません。

  1. overwriteを使用して書き込む方法

    バージョン1.3.0以降、overwriteモードでの書き込みがサポートされています(Table全体レベルでのデータ上書きのみサポート)。具体的な使用方法は以下の通りです:

    DataFrame

    resultDf.format("doris")
    .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
    // your own options
    .mode(SaveMode.Overwrite)
    .save()

SQL

```sparksql
INSERT OVERWRITE your_target_table SELECT * FROM your_source_table
```

3. Bitmap型の読み取り方法

バージョン24.0.0以降、Arrow Flight SQLを通じて変換されたBitmapデータの読み取りをサポートしています(Dorisバージョン >= 2.1.0が必要です)。

Bitmapから文字列へ

DataFrameの例は以下の通りです。doris.read.bitmap-to-stringをtrueに設定してください。具体的な結果フォーマットについては、オプション定義を参照してください。

spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.option("doris.read.bitmap-to-string","true")
.load()

Bitmap to base64

DataFrameの例は以下の通りです。doris.read.bitmap-to-base64をtrueに設定してください。具体的な結果形式については、オプション定義を参照してください。

spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.option("doris.read.bitmap-to-base64","true")
.load()
  1. DataFrame モードでの書き込み時にエラーが発生する場合: org.apache.spark.sql.AnalysisException: TableProvider implementation doris cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead.

    save mode を append に追加する必要があります。

    resultDf.format("doris")
    .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
    // your own options
    .mode(SaveMode.Append)
    .save()