ArangoDB Datasource for Apache Spark
ArangoDB Datasource for Apache Spark allows batch reading and writing Spark DataFrame data from and to ArangoDB, by implementing the Spark Data Source V2 API.
Reading tasks are parallelized according to the number of shards of the related ArangoDB collection, and the writing ones - depending on the source DataFrame partitions. The network traffic is load balanced across the available DB Coordinators.
Filter predicates and column selections are pushed down to the DB by dynamically generating AQL queries, which will fetch only the strictly required data, thus saving network and computational resources both on the Spark and the DB side.
The connector is usable from all the Spark supported client languages: Scala, Python, Java, and R.
This library works with all the non-EOLed ArangoDB versions.
Supported versions
There are several variants of this library, each one compatible with different Spark and Scala versions:
com.arangodb:arangodb-spark-datasource-2.4_2.11
(Spark 2.4, Scala 2.11)com.arangodb:arangodb-spark-datasource-2.4_2.12
(Spark 2.4, Scala 2.12)com.arangodb:arangodb-spark-datasource-3.1_2.12
(Spark 3.1, Scala 2.12)com.arangodb:arangodb-spark-datasource-3.2_2.12
(Spark 3.2, Scala 2.12)com.arangodb:arangodb-spark-datasource-3.2_2.13
(Spark 3.2, Scala 2.13)
In the following sections the ${sparkVersion}
and ${scalaVersion}
placeholders refer to the Spark and Scala versions.
Setup
To import ArangoDB Datasource for Apache Spark in a maven project:
<dependencies>
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-spark-datasource-${sparkVersion}_${scalaVersion}</artifactId>
<version>x.y.z</version>
</dependency>
</dependencies>
To use in an external Spark cluster, submit your application with the following parameter:
--packages="com.arangodb:arangodb-spark-datasource-${sparkVersion}_${scalaVersion}:x.y.z"
General Configuration
user
: db user,root
by defaultpassword
: db passwordendpoints
: list of Coordinators, e.g.c1:8529,c2:8529
(required)acquireHostList
: acquire the list of all known hosts in the cluster (true
orfalse
),false
by defaultprotocol
: communication protocol (vst
orhttp
),http
by defaultcontentType
: content type for driver communication (json
orvpack
),json
by defaulttimeout
: driver connect and request timeout in ms,300000
by defaultssl.enabled
: ssl secured driver connection (true
orfalse
),false
by defaultssl.cert.value
: Base64 encoded certificatessl.cert.type
: certificate type,X.509
by defaultssl.cert.alias
: certificate alias name,arangodb
by defaultssl.algorithm
: trust manager algorithm,SunX509
by defaultssl.keystore.type
: keystore type,jks
by defaultssl.protocol
: SSLContext protocol,TLS
by default
SSL
To use TLS secured connections to ArangoDB, set ssl.enabled
to true
and either:
- provide a Base64 encoded certificate as the
ssl.cert.value
configuration entry and optionally setssl.*
or - start the Spark driver and workers with a properly configured JVM default TrustStore
Supported deployment topologies
The connector can work with a single server, a cluster and active failover deployments of ArangoDB.
Batch Read
The connector implements support for batch reading from an ArangoDB collection.
val df: DataFrame = spark.read
.format("com.arangodb.spark")
.options(options) // Map[String, String]
.schema(schema) // StructType
.load()
The connector can read data from:
- a collection
- an AQL cursor (query specified by the user)
When reading data from a collection, the reading job is split into many Spark tasks, one for each shard in the ArangoDB source collection. The resulting Spark DataFrame has the same number of partitions as the number of shards in the ArangoDB collection, each one containing data from the respective collection shard. The reading tasks consist of AQL queries that are load balanced across all the available ArangoDB Coordinators. Each query is related to only one shard, therefore it will be executed locally in the DB-Server holding the related shard.
When reading data from an AQL cursor, the reading job cannot be partitioned or parallelized, so it will be less scalable. This mode can be used for reading data coming from different tables, i.e. resulting from an AQL traversal query.
Example
val spark: SparkSession = SparkSession.builder()
.appName("ArangoDBSparkDemo")
.master("local[*]")
.config("spark.driver.host", "127.0.0.1")
.getOrCreate()
val df: DataFrame = spark.read
.format("com.arangodb.spark")
.options(Map(
"password" -> "test",
"endpoints" -> "c1:8529,c2:8529,c3:8529",
"table" -> "users"
))
.schema(new StructType(
Array(
StructField("likes", ArrayType(StringType, containsNull = false)),
StructField("birthday", DateType, nullable = true),
StructField("gender", StringType, nullable = false),
StructField("name", StructType(
Array(
StructField("first", StringType, nullable = true),
StructField("last", StringType, nullable = false)
)
), nullable = true)
)
))
.load()
usersDF.filter(col("birthday") === "1982-12-15").show()
Read Configuration
database
: database name,_system
by defaulttable
: datasource ArangoDB collection name, ignored ifquery
is specified. Eithertable
orquery
is required.query
: custom AQL read query. If set,table
will be ignored. Eithertable
orquery
is required.batchSize
: reading batch size,10000
by defaultsampleSize
: sample size prefetched for schema inference, only used if read schema is not provided,1000
by defaultfillBlockCache
: specifies whether the query should store the data it reads in the RocksDB block cache (true
orfalse
),false
by defaultstream
: specifies whether the query should be executed lazily,true
by defaultmode
: allows setting a mode for dealing with corrupt records during parsing:PERMISSIVE
: win case of a corrupted record, the malformed string is put into a field configured bycolumnNameOfCorruptRecord
, and sets malformed fields to null. To keep corrupt records, a user can set a string type field namedcolumnNameOfCorruptRecord
in a user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. When inferring a schema, it implicitly adds thecolumnNameOfCorruptRecord
field in an output schemaDROPMALFORMED
: ignores the whole corrupted recordsFAILFAST
: throws an exception in case of corrupted records
columnNameOfCorruptRecord
: allows renaming the new field having malformed string created by thePERMISSIVE
mode
Predicate and Projection Pushdown
The connector can convert some Spark SQL filter predicates into AQL predicates and push their execution down to the data source. In this way, ArangoDB can apply the filters and return only the matching documents.
The following filter predicates (implementations of org.apache.spark.sql.sources.Filter
) are pushed down:
And
Or
Not
EqualTo
EqualNullSafe
IsNull
IsNotNull
GreaterThan
GreaterThanOrEqualFilter
LessThan
LessThanOrEqualFilter
StringStartsWithFilter
StringEndsWithFilter
StringContainsFilter
InFilter
Furthermore, the connector will push down the subset of columns required by the Spark query, so that only the relevant documents fields will be returned.
Predicate and projection pushdowns are only performed while reading an ArangoDB collection (set by the table
configuration parameter). In case of a batch read from a custom query (set by the query
configuration parameter), no pushdown optimizations are performed.
Read Resiliency
The data of each partition is read using an AQL cursor. If any error occurs, the read task of the related partition will fail. Depending on the Spark configuration, the task could be retried.
Batch Write
The connector implements support for batch writing to ArangoDB collection.
import org.apache.spark.sql.DataFrame
val df: DataFrame = //...
df.write
.format("com.arangodb.spark")
.mode(SaveMode.Append)
.options(Map(
"password" -> "test",
"endpoints" -> "c1:8529,c2:8529,c3:8529",
"table" -> "users"
))
.save()
Write tasks are load balanced across the available ArangoDB Coordinators. The data saved into the ArangoDB is sharded according to the related target collection definition and is different from the Spark DataFrame partitioning.
SaveMode
On writing, org.apache.spark.sql.SaveMode
is used to specify the expected behavior in case the target collection already exists.
Spark 2.4 implementation supports all save modes with the following semantics:
Append
: the target collection is created, if it does not exist.Overwrite
: the target collection is created, if it does not exist, otherwise it is truncated. Use it in combination with theconfirmTruncate
write configuration parameter.ErrorIfExists
: the target collection is created, if it does not exist, otherwise anAnalysisException
is thrown.Ignore
: the target collection is created, if it does not exist, otherwise no write is performed.
Spark 3 implementations support:
Append
: the target collection is created, if it does not exist.Overwrite
: the target collection is created, if it does not exist, otherwise it is truncated. Use it in combination with theconfirmTruncate
write configuration parameter.
In Spark 3 implementations, the ErrorIfExists
and Ignore
save modes behave the same as Append
.
Use the overwriteMode
write configuration parameter to specify the document overwrite behavior (if a document with the same _key
already exists).
Write Configuration
table
: target ArangoDB collection name (required)batchSize
: writing batch size,10000
by defaultbyteBatchSize
: byte batch size threshold, only considered forcontentType=json
,8388608
by default (8 MB)table.shards
: number of shards of the created collection (in case of theAppend
orOverwrite
SaveMode)table.type
: type (document
oredge
) of the created collection (in case of theAppend
orOverwrite
SaveMode),document
by defaultwaitForSync
: specifies whether to wait until the documents have been synced to disk (true
orfalse
),false
by defaultconfirmTruncate
: confirms to truncate table when using theOverwrite
SaveMode,false
by defaultoverwriteMode
: configures the behavior in case a document with the specified_key
value already exists. It is only considered forAppend
SaveMode.ignore
(default for SaveMode other thanAppend
): it will not be writtenreplace
: it will be overwritten with the specified document valueupdate
: it will be patched (partially updated) with the specified document value. The overwrite mode can be further controlled via thekeepNull
andmergeObjects
parameter.keepNull
will also be automatically set totrue
, so that null values are kept in the saved documents and not used to remove existing document fields (as for default ArangoDB upsert behavior).conflict
(default for theAppend
SaveMode): return a unique constraint violation error so that the insert operation fails
mergeObjects
: in caseoverwriteMode
is set toupdate
, controls whether objects (not arrays) will be merged.true
(default): objects will be mergedfalse
: existing document fields will be overwritten
keepNull
: in caseoverwriteMode
is set toupdate
true
(default):null
values are saved within the document (by default)false
:null
values are used to delete the corresponding existing attributes
retry.maxAttempts
: max attempts for retrying write requests in case they are idempotent,10
by defaultretry.minDelay
: min delay in ms between write requests retries,0
by defaultretry.maxDelay
: max delay in ms between write requests retries,0
by default
Write Resiliency
The data of each partition is saved in batches using the ArangoDB API for inserting multiple documents. This operation is not atomic, therefore some documents could be successfully written to the database, while others could fail. To make the job more resilient to temporary errors (i.e. connectivity problems), in case of failure the request will be retried (with another Coordinator), if the provided configuration allows idempotent requests, namely:
- the schema of the dataframe has a not nullable
_key
field and overwriteMode
is set to one of the following values:replace
ignore
update
withkeep.null=true
A failing batch-saving request is retried once for every Coordinator. After that, if still failing, the write task for the related partition is aborted. According to the Spark configuration, the task can be retried and rescheduled on a different executor, if the provided write configuration allows idempotent requests (as described above).
If a task ultimately fails and is aborted, the entire write job will be aborted as well. Depending on the SaveMode
configuration, the following cleanup operations will be performed:
Append
: no cleanup is performed and the underlying data source may require manual cleanup.DataWriteAbortException
is thrown.Overwrite
: the target collection will be truncated.ErrorIfExists
: the target collection will be dropped.Ignore
: if the collection did not exist before, it will be dropped; otherwise, nothing will be done.
Write requirements
When writing to an edge collection (table.type=edge
), the schema of the Dataframe being written must have:
- a non nullable string field named
_from
, and - a non nullable string field named
_to
Write Limitations
- Batch writes are not performed atomically, so sometimes (i.e. in case of
overwrite.mode: conflict
) several documents in the batch may be written and others may return an exception (i.e. due to a conflicting key). - Writing records with the
_key
attribute is only allowed on collections sharded by_key
. - In case of the
Append
save mode, failed jobs cannot be rolled back and the underlying data source may require manual cleanup. - Speculative execution of tasks only works for idempotent write configurations. See Write Resiliency for more details.
- Speculative execution of tasks can cause concurrent writes to the same documents, resulting in write-write conflicts or lock timeouts
Mapping Configuration
Serialization and deserialization of Spark Dataframe Row to and from JSON (or Velocypack) can be customized using the following options:
ignoreNullFields
: whether to ignore null fields during serialization,false
by default (only supported in Spark 3.x)
Supported Spark data types
The following Spark SQL data types (subtypes of org.apache.spark.sql.types.Filter
) are supported for reading, writing and filter pushdown.
- Numeric types:
ByteType
ShortType
IntegerType
LongType
FloatType
DoubleType
- String types:
StringType
- Boolean types:
BooleanType
- Datetime types:
TimestampType
DateType
- Complex types:
ArrayType
MapType
(only with key typeStringType
)StructType
Connect to the ArangoGraph Insights Platform
To connect to SSL secured deployments using X.509 Base64 encoded CA certificate (ArangoGraph):
val options = Map(
"database" -> "<dbname>",
"user" -> "<username>",
"password" -> "<passwd>",
"endpoints" -> "<endpoint>:<port>",
"ssl.cert.value" -> "<base64 encoded CA certificate>",
"ssl.enabled" -> "true",
"table" -> "<table>"
)
// read
val myDF = spark.read
.format("com.arangodb.spark")
.options(options)
.load()
// write
import org.apache.spark.sql.DataFrame
val df: DataFrame = //...
df.write
.format("com.arangodb.spark")
.options(options)
.save()
Current limitations
- For
contentType=vpack
, implicit deserialization casts don’t work well, i.e. reading a document having a field with a numeric value whereas the related read schema requires a string value for such a field. - Dates and timestamps fields are interpreted to be in a UTC time zone.
- In Spark 2.4, for corrupted records in batch reading, partial results are not supported. All fields other than the field configured by
columnNameOfCorruptRecord
are set tonull
(SPARK-26303). - In read jobs using
stream=true
(default), possible AQL warnings are only logged at the end of each read task (BTS-671). - Spark SQL
DecimalType
fields are not supported in write jobs when usingcontentType=json
. - Spark SQL
DecimalType
values are written to the database as strings. byteBatchSize
is only considered forcontentType=json
(DE-226)
Demo
Check out our demo to learn more about ArangoDB Datasource for Apache Spark.