Join us on
Star us on
Get Started
Slack
GitHub
Get Started
v2.5 (latest) v2.2 (stable) v2.1 (earlier version) v2.0 (earlier version) v1.3 (earlier version)
  • GET STARTED
    • Quick start
      • 1. Install YugabyteDB
      • 2. Create a local cluster
      • 3. Explore YSQL
      • 4. Build an application
        • Java
        • NodeJS
        • Go
        • Python
        • Ruby
        • C#
        • PHP
        • C++
        • C
    • Introduction
    • Explore core
      • 1. Linear scalability
      • 2. Fault tolerance
      • 3. Global distribution
      • 4. Auto sharding
      • 5. Tunable reads
      • 6. Observability
  • USER GUIDES
    • Develop
      • Learn app development
        • 1. SQL vs NoSQL
        • 2. Data modeling
        • 3. Data types
        • 4. ACID transactions
        • 5. Aggregations
        • 6. Batch operations
        • 7. Date and time
        • 8. Strings and text
      • Ecosystem integrations
        • Apache Kafka
        • Apache Spark
        • JanusGraph
        • KairosDB
        • Presto
        • Metabase
      • Real-world examples
        • E-Commerce App
        • IoT Fleet Management
        • Retail Analytics
      • Explore sample applications
    • Deploy
      • Checklist
      • Manual deployment
        • 1. System configuration
        • 2. Install software
        • 3. Start YB-Masters
        • 4. Start YB-TServers
        • 5. Verify deployment
      • Kubernetes
        • Helm Chart
        • Helm configuration
        • Local SSD
      • Docker
      • Public clouds
        • Amazon Web Services
        • Google Cloud Platform
        • Microsoft Azure
      • Pivotal Cloud Foundry
      • Yugabyte Platform
        • 1. Prepare cloud environment
        • 2. Install Admin Console
        • 3. Configure Admin Console
        • 4. Configure Cloud Providers
    • Benchmark
      • Performance
      • YCSB
      • Large datasets
    • Secure
      • Security checklist
      • Authentication
      • Authorization
        • 1. RBAC Model
        • 2. Create Roles
        • 3. Grant permissions
      • TLS encryption
        • 1. Prepare nodes
        • 2. Server-server encryption
        • 3. Client-server encryption
        • 4. Connect to cluster
      • Encryption at Rest
    • Manage
      • Backup and restore
        • Backing up data
        • Restoring data
      • Data migration
        • Bulk import
        • Bulk export
      • Change cluster config
      • Upgrade deployment
      • Diagnostics reporting
      • Yugabyte Platform
        • Create universe - Multi-zone
        • Create universe - Multi-region
        • Edit universe
        • Edit config flags
        • Health checking and alerts
        • Create and edit instance tags
        • Node status and actions
        • Read replicas
        • Back up and restore
        • Upgrade universe
        • Delete universe
    • Troubleshoot
      • Troubleshooting overview
      • Cluster level issues
        • YCQL connection issues
        • YEDIS connection Issues
      • Node level issues
        • Check processes
        • Inspect logs
        • System statistics
      • Yugabyte Platform
        • Troubleshoot universes
  • REFERENCE
    • APIs
      • YSQL
        • Statements
          • ABORT
          • ALTER DATABASE
          • ALTER DOMAIN
          • ALTER TABLE
          • BEGIN
          • COMMENT
          • COMMIT
          • COPY
          • CREATE DATABASE
          • CREATE DOMAIN
          • CREATE INDEX
          • CREATE SCHEMA
          • CREATE SEQUENCE
          • CREATE TABLE
          • CREATE TABLE AS
          • CREATE TYPE
          • CREATE USER
          • CREATE VIEW
          • DEALLOCATE
          • DELETE
          • DROP DATABASE
          • DROP DOMAIN
          • DROP SEQUENCE
          • DROP TABLE
          • DROP TYPE
          • END
          • EXECUTE
          • EXPLAIN
          • GRANT
          • INSERT
          • LOCK
          • PREPARE
          • RESET
          • REVOKE
          • ROLLBACK
          • SELECT
          • SET
          • SET CONSTRAINTS
          • SET TRANSACTION
          • SHOW
          • SHOW TRANSACTION
          • TRUNCATE
          • UPDATE
        • Data types
          • Binary
          • Boolean
          • Character
          • Date-time
          • Json
          • Money
          • Numeric
          • Serial
          • UUID
        • Expressions
          • currval()
          • lastval()
          • nextval()
        • Keywords
        • Reserved Names
      • YCQL
        • Quick Start YCQL
        • ALTER KEYSPACE
        • ALTER ROLE
        • ALTER TABLE
        • CREATE INDEX
        • CREATE KEYSPACE
        • CREATE ROLE
        • CREATE TABLE
        • CREATE TYPE
        • DROP INDEX
        • DROP KEYSPACE
        • DROP ROLE
        • DROP TABLE
        • DROP TYPE
        • GRANT PERMISSION
        • GRANT ROLE
        • REVOKE PERMISSION
        • REVOKE ROLE
        • USE
        • INSERT
        • SELECT
        • UPDATE
        • DELETE
        • TRANSACTION
        • TRUNCATE
        • Simple Value
        • Subscript
        • Function Call
        • Operator Call
        • BLOB
        • BOOLEAN
        • MAP, SET, LIST
        • FROZEN
        • INET
        • Integer & Counter
        • Non-Integer
        • TEXT
        • Date & Time Types
        • UUID & TIMEUUID
        • JSONB
        • Date and time functions
    • CLIs
      • yb-ctl
      • yb-docker-ctl
      • yb-master
      • yb-tserver
      • ysqlsh
      • cqlsh
    • Sample data
      • Chinook
      • Northwind
      • PgExercises
      • SportsDB
    • Tools
      • TablePlus
  • RELEASES
    • Release history
      • v1.3.1
      • v1.3.0
      • v1.2.12
      • v1.2.11
      • v1.2.10
      • v1.2.9
      • v1.2.8
      • v1.2.6
      • v1.2.5
      • v1.2.4
  • CONCEPTS
    • Architecture
      • Design goals
      • Layered architecture
      • Basic concepts
        • Universe
        • YB-TServer
        • YB-Master
        • Acknowledgements
      • Query layer
        • Overview
      • DocDB store
        • Sharding
        • Replication
        • Persistence
        • Performance
      • DocDB transactions
        • Isolation Levels
        • Single row transactions
        • Distributed transactions
        • Transactional IO path
  • FAQ
    • Comparisons
      • CockroachDB
      • Google Cloud Spanner
      • MongoDB
      • FoundationDB
      • Amazon DynamoDB
      • Azure Cosmos DB
      • Apache Cassandra
      • Redis in-memory store
      • Apache HBase
    • Other FAQs
      • Product
      • Architecture
      • Yugabyte Platform
      • API compatibility
  • CONTRIBUTOR GUIDES
    • Get involved
  • Misc
    • YEDIS
      • Quick start
      • Develop
        • Client drivers
          • C
          • C++
          • C#
          • Go
          • Java
          • NodeJS
          • Python
      • API reference
        • APPEND
        • AUTH
        • CONFIG
        • CREATEDB
        • DELETEDB
        • LISTDB
        • SELECT
        • DEL
        • ECHO
        • EXISTS
        • EXPIRE
        • EXPIREAT
        • FLUSHALL
        • FLUSHDB
        • GET
        • GETRANGE
        • GETSET
        • HDEL
        • HEXISTS
        • HGET
        • HGETALL
        • HINCRBY
        • HKEYS
        • HLEN
        • HMGET
        • HMSET
        • HSET
        • HSTRLEN
        • HVALS
        • INCR
        • INCRBY
        • KEYS
        • MONITOR
        • PEXPIRE
        • PEXPIREAT
        • PTTL
        • ROLE
        • SADD
        • SCARD
        • RENAME
        • SET
        • SETEX
        • PSETEX
        • SETRANGE
        • SISMEMBER
        • SMEMBERS
        • SREM
        • STRLEN
        • ZRANGE
        • TSADD
        • TSCARD
        • TSGET
        • TSLASTN
        • TSRANGEBYTIME
        • TSREM
        • TSREVRANGEBYTIME
        • TTL
        • ZADD
        • ZCARD
        • ZRANGEBYSCORE
        • ZREM
        • ZREVRANGE
        • ZSCORE
        • PUBSUB
        • PUBLISH
        • SUBSCRIBE
        • UNSUBSCRIBE
        • PSUBSCRIBE
        • PUNSUBSCRIBE
> Develop > Ecosystem integrations >

Apache Spark

Attention

This page documents an earlier version. Go to the latest (v2.3) version.
  • Java
  • Python
  • Scala

Maven

Add the following snippet to your pom.xml for Scala 2.10:

<dependency>
  <groupId>com.yugabyte.spark</groupId>
  <artifactId>spark-cassandra-connector_2.10</artifactId>
  <version>2.0.5-yb-2</version>
</dependency>

For Scala 2.11:

<dependency>
  <groupId>com.yugabyte.spark</groupId>
  <artifactId>spark-cassandra-connector_2.11</artifactId>
  <version>2.0.5-yb-2</version>
</dependency>

Sample application

Running the Spark word-count sample application

You can run our Spark-based sample app with:

$ java -jar yb-sample-apps.jar --workload CassandraSparkWordCount --nodes 127.0.0.1:9042

It reads data from a table with sentences — by default, it generates an input table ybdemo_keyspace.lines, computes the frequencies of the words, and writes the result to the output table ybdemo_keyspace.wordcounts.

Examining the source code

To look at the source code, you can check:

  • the source file in our GitHub source repo here
  • untar the jar java/yb-sample-apps-sources.jar in the download bundle

Most of the logic is in the run() method of the CassandraSparkWordCount class (in the file src/main/java/com/yugabyte/sample/apps/CassandraSparkWordCount.java). Some of the key portions of the sample program are explained in the sections below.

Main sections of an Apache Spark program on Yugabyte

Initializing the Spark context

The SparkConf object is configured as follows:

// Setup the local spark master, with the desired parallelism.
SparkConf conf = new SparkConf().setAppName("yb.wordcount")
                                .setMaster("local[1]")       // num Spark threads
                                .set("spark.cassandra.connection.host", hostname);

// Create the Java Spark context object. JavaSparkContext sc = new JavaSparkContext(conf);

// Create the Cassandra connector to Spark. CassandraConnector connector = CassandraConnector.apply(conf);

// Create a Cassandra session, and initialize the keyspace. Session session = connector.openSession();

Setting the input source

To set the input data for Spark, you can do one of the following.

  • Reading from a table with a column line as the input
// Read rows from table and convert them to an RDD.
JavaRDD<String> rows = javaFunctions(sc).cassandraTable(keyspace, inputTable)
                                        .select("line")
                                        .map(row -> row.getString("line"));
  • Reading from a file as the input:
// Read the input file and convert it to an RDD.
JavaRDD<String> rows = sc.textFile(inputFile);

Performing the word count processing

The word count is performed using the following code snippet.

// Perform the word count.
JavaPairRDD<String, Integer> counts = rows.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
                                          .mapToPair(word -> new Tuple2<String, Integer>(word, 1))
                                          .reduceByKey((x, y) ->  x + y);

Setting the output table

The output is written to the outTable table.

// Create the output table.
session.execute("CREATE TABLE IF NOT EXISTS " + outTable +
                " (word VARCHAR PRIMARY KEY, count INT);");

// Save the output to the CQL table. javaFunctions(counts).writerBuilder(keyspace, outputTable, mapTupleToRow(String.class, Integer.class)) .withColumnSelector(someColumns("word", "count")) .saveToCassandra();

PySpark

Start PySpark with for Scala 2.10:

$ pyspark --packages com.yugabyte.spark:spark-cassandra-connector_2.10:2.0.5-yb-2

For Scala 2.11:

$ pyspark --packages com.yugabyte.spark:spark-cassandra-connector_2.11:2.0.5-yb-2

sbt

Add the following library dependency to your project configuration:

libraryDependencies += "com.yugabyte.spark" %% "spark-cassandra-connector" % "2.0.5-yb-2"
Ask our community
  • Slack
  • Github
  • Forum
  • StackOverflow
Yugabyte
Contact Us
Copyright © 2017-2020 Yugabyte, Inc. All rights reserved.