Join us on
Star us on
Get Started
Slack
GitHub
Get Started
v2.0 (latest version) v1.3 (earlier version) v1.2 (earlier version) v1.1 (earlier version) v1.0 (earlier version)
  • GET STARTED
    • Quick start
      • 1. Install YugabyteDB
      • 2. Create a local cluster
      • 3. Explore YSQL
      • 4. Build an application
        • Java
        • NodeJS
        • Go
        • Python
        • Ruby
        • C#
        • PHP
        • C++
        • C
    • Introduction
    • Explore core
      • Linear scalability
      • Fault tolerance
      • Global distribution
      • Auto-sharding
      • Tunable reads
      • Observability
      • Two data center deployment
  • USER GUIDES
    • Develop
      • Learn app development
        • 1. SQL vs NoSQL
        • 2. Data modeling
        • 3. Data types
        • 4. ACID transactions
        • 5. Aggregations
        • 6. Batch operations
        • 7. Date and time
        • 8. Strings and text
      • Ecosystem integrations
        • Apache Kafka
        • Apache Spark
        • JanusGraph
        • KairosDB
        • Presto
        • Metabase
      • Real-world examples
        • E-Commerce App
        • IoT Fleet Management
        • Retail Analytics
      • Work with GraphQL
        • Hasura
        • Prisma
      • Explore sample applications
    • Deploy
      • Deployment checklist
      • Manual deployment
        • 1. System configuration
        • 2. Install software
        • 3. Start YB-Masters
        • 4. Start YB-TServers
        • 5. Verify deployment
      • Kubernetes
        • Helm chart
        • Helm configuration
        • Local SSD
        • Rook operator
        • YugabyteDB operator
      • Docker
      • Public clouds
        • Amazon Web Services
        • Google Cloud Platform
        • Microsoft Azure
      • Change data capture (CDC)
        • CDC to Kafka
        • CDC to stdout
      • Two data center (2DC)
      • Pivotal Cloud Foundry
      • Yugabyte Platform
        • 1. Prepare cloud environment
        • 2. Install Admin Console
        • 3. Configure Admin Console
        • 4. Configure cloud providers
    • Benchmark
      • Performance
      • YCSB
      • Large datasets
    • Secure
      • Security checklist
      • Authentication
        • Authentication
        • Fine-grained authentication
      • Authorization
        • RBAC model
        • Create roles
        • Grant privileges
      • Encryption in transit
        • 1. Prepare nodes
        • 2. Server-server encryption
        • 3. Client-server encryption
        • 4. Connect to cluster
      • Encryption at rest
    • Manage
      • Backup and restore
        • Backing up data
        • Restoring data
        • Backing Up Data using Snapshots
      • Data migration
        • Bulk import
        • Bulk export
      • Change cluster configuration
      • Diagnostics reporting
      • Upgrade deployment
      • Yugabyte Platform
        • Create universe - Multi-zone
        • Create universe - Multi-region
        • Edit universe
        • Edit configuration options
        • Health checking and alerts
        • Create and edit instance tags
        • Node status and actions
        • Read replicas
        • Back up and restore
        • Upgrade universe
        • Delete universe
    • Troubleshoot
      • Troubleshooting
      • Cluster level issues
        • YCQL connection issues
        • YEDIS connection Issues
        • Recover tserver/master
      • Node level issues
        • Check processes
        • Inspect logs
        • System statistics
      • Yugabyte Platform
        • Troubleshoot universes
  • REFERENCE
    • Configuration
      • YB-TServer service
      • YB-Master service
      • Default ports
    • APIs
      • YSQL
        • Statements
          • ABORT
          • ALTER DATABASE
          • ALTER DEFAULT PRIVILEGES
          • ALTER DOMAIN
          • ALTER GROUP
          • ALTER POLICY
          • ALTER ROLE
          • ALTER SEQUENCE
          • ALTER TABLE
          • ALTER USER
          • BEGIN
          • COMMIT
          • COPY
          • CREATE AGGREGATE
          • CREATE CAST
          • CREATE DATABASE
          • CREATE DOMAIN
          • CREATE FUNCTION
          • CREATE GROUP
          • CREATE INDEX
          • CREATE OPERATOR
          • CREATE OPERATOR CLASS
          • CREATE POLICY
          • CREATE PROCEDURE
          • CREATE ROLE
          • CREATE RULE
          • CREATE SCHEMA
          • CREATE SEQUENCE
          • CREATE TABLE
          • CREATE TABLE AS
          • CREATE TRIGGER
          • CREATE TYPE
          • CREATE USER
          • CREATE VIEW
          • DEALLOCATE
          • DELETE
          • DO
          • DROP AGGREGATE
          • DROP CAST
          • DROP DATABASE
          • DROP DOMAIN
          • DROP FUNCTION
          • DROP GROUP
          • DROP OPERATOR
          • DROP OPERATOR CLASS
          • DROP OWNED
          • DROP POLICY
          • DROP PROCEDURE
          • DROP ROLE
          • DROP RULE
          • DROP SEQUENCE
          • DROP TABLE
          • DROP TRIGGER
          • DROP TYPE
          • DROP USER
          • END
          • EXECUTE
          • EXPLAIN
          • GRANT
          • INSERT
          • LOCK
          • PREPARE
          • REASSIGN OWNED
          • RESET
          • REVOKE
          • ROLLBACK
          • SELECT
          • SET
          • SET CONSTRAINTS
          • SET ROLE
          • SET SESSION AUTHORIZATION
          • SET TRANSACTION
          • SHOW
          • SHOW TRANSACTION
          • TRUNCATE
          • UPDATE
        • Data types
          • Binary
          • Boolean
          • Character
          • Date-time
          • Json
          • Money
          • Numeric
          • Serial
          • UUID
        • Expressions
          • currval()
          • lastval()
          • nextval()
        • Extensions
        • Keywords
        • Reserved names
      • YCQL
        • Quick start YCQL
        • ALTER KEYSPACE
        • ALTER ROLE
        • ALTER TABLE
        • CREATE INDEX
        • CREATE KEYSPACE
        • CREATE ROLE
        • CREATE TABLE
        • CREATE TYPE
        • DROP INDEX
        • DROP KEYSPACE
        • DROP ROLE
        • DROP TABLE
        • DROP TYPE
        • GRANT PERMISSION
        • GRANT ROLE
        • REVOKE PERMISSION
        • REVOKE ROLE
        • USE
        • INSERT
        • SELECT
        • UPDATE
        • DELETE
        • TRANSACTION
        • TRUNCATE
        • Simple Value
        • Subscript
        • Function Call
        • Operator Call
        • BLOB
        • BOOLEAN
        • MAP, SET, LIST
        • FROZEN
        • INET
        • Integer & Counter
        • Non-Integer
        • TEXT
        • Date & Time Types
        • UUID & TIMEUUID
        • JSONB
        • Date and time Functions
    • CLIs
      • yb-ctl
      • yb-docker-ctl
      • ysqlsh
      • cqlsh
      • yb-admin
    • Sample data
      • Chinook
      • Northwind
      • PgExercises
      • SportsDB
    • Tools
      • DBeaver
      • pgAdmin
      • SQL Workbench/J
      • TablePlus
      • Visual Studio Code
    • Connectors and drivers
      • PostgreSQL JDBC Driver
      • YugabyteDB JDBC Driver
      • Kafka Connect YugabyteDB
      • Spring Data YugabyteDB
  • RELEASES
    • Release history
      • v2.0.7
      • v2.0.6
      • v2.0.5
      • v2.0.3
      • v2.0.1
      • v2.0.0
      • v1.3.1
      • v1.3.0
      • v1.2.12
      • v1.2.11
      • v1.2.10
      • v1.2.9
      • v1.2.8
      • v1.2.6
      • v1.2.5
      • v1.2.4
  • CONCEPTS
    • Architecture
      • Design goals
      • Key concepts
        • Universe
        • YB-TServer
        • YB-Master
        • Acknowledgements
      • Core functions
        • Universe creation
        • Table creation
        • Write IO path
        • Read IO path
        • High availability
      • Layered architecture
      • Query layer
        • Overview
      • DocDB store
        • Sharding
        • Replication
        • Persistence
        • Performance
      • DocDB transactions
        • Isolation Levels
        • Single row transactions
        • Distributed transactions
        • Transactional IO path
      • Change data capture (CDC)
      • Two data center (2DC) deployments
  • FAQ
    • Comparisons
      • CockroachDB
      • Google Cloud Spanner
      • MongoDB
      • FoundationDB
      • Amazon DynamoDB
      • Azure Cosmos DB
      • Apache Cassandra
      • Redis in-memory store
      • Apache HBase
    • Other FAQs
      • Product
      • Architecture
      • Yugabyte Platform
      • API compatibility
  • CONTRIBUTE
    • Get involved
    • Core database
      • Checklist
      • Building the source
      • Running the Tests
  • Misc
    • YEDIS
      • Quick start
      • Develop
        • Client drivers
          • C
          • C++
          • C#
          • Go
          • Java
          • NodeJS
          • Python
      • API reference
        • APPEND
        • AUTH
        • CONFIG
        • CREATEDB
        • DELETEDB
        • LISTDB
        • SELECT
        • DEL
        • ECHO
        • EXISTS
        • EXPIRE
        • EXPIREAT
        • FLUSHALL
        • FLUSHDB
        • GET
        • GETRANGE
        • GETSET
        • HDEL
        • HEXISTS
        • HGET
        • HGETALL
        • HINCRBY
        • HKEYS
        • HLEN
        • HMGET
        • HMSET
        • HSET
        • HSTRLEN
        • HVALS
        • INCR
        • INCRBY
        • KEYS
        • MONITOR
        • PEXPIRE
        • PEXPIREAT
        • PTTL
        • ROLE
        • SADD
        • SCARD
        • RENAME
        • SET
        • SETEX
        • PSETEX
        • SETRANGE
        • SISMEMBER
        • SMEMBERS
        • SREM
        • STRLEN
        • ZRANGE
        • TSADD
        • TSCARD
        • TSGET
        • TSLASTN
        • TSRANGEBYTIME
        • TSREM
        • TSREVRANGEBYTIME
        • TTL
        • ZADD
        • ZCARD
        • ZRANGEBYSCORE
        • ZREM
        • ZREVRANGE
        • ZSCORE
        • PUBSUB
        • PUBLISH
        • SUBSCRIBE
        • UNSUBSCRIBE
        • PSUBSCRIBE
        • PUNSUBSCRIBE
> Develop > Ecosystem integrations >

Apache Spark


  • Java
  • Python
  • Scala

Maven

Add the following snippet to your pom.xml for Scala 2.10:

<dependency>
  <groupId>com.yugabyte.spark</groupId>
  <artifactId>spark-cassandra-connector_2.10</artifactId>
  <version>2.0.5-yb-2</version>
</dependency>

For Scala 2.11:

<dependency>
  <groupId>com.yugabyte.spark</groupId>
  <artifactId>spark-cassandra-connector_2.11</artifactId>
  <version>2.0.5-yb-2</version>
</dependency>

Sample application

Running the Spark word-count sample application

You can run our Spark-based sample app with:

$ java -jar yb-sample-apps.jar --workload CassandraSparkWordCount --nodes 127.0.0.1:9042

It reads data from a table with sentences — by default, it generates an input table ybdemo_keyspace.lines, computes the frequencies of the words, and writes the result to the output table ybdemo_keyspace.wordcounts.

Examining the source code

To look at the source code, you can check:

  • the source file in our GitHub source repo here
  • untar the jar java/yb-sample-apps-sources.jar in the download bundle

Most of the logic is in the run() method of the CassandraSparkWordCount class (in the file src/main/java/com/yugabyte/sample/apps/CassandraSparkWordCount.java). Some of the key portions of the sample program are explained in the sections below.

Main sections of an Apache Spark program on Yugabyte

Initializing the Spark context

The SparkConf object is configured as follows:

// Setup the local spark master, with the desired parallelism.
SparkConf conf = new SparkConf().setAppName("yb.wordcount")
                                .setMaster("local[1]")       // num Spark threads
                                .set("spark.cassandra.connection.host", hostname);

// Create the Java Spark context object.
JavaSparkContext sc = new JavaSparkContext(conf);

// Create the Cassandra connector to Spark.
CassandraConnector connector = CassandraConnector.apply(conf);

// Create a Cassandra session, and initialize the keyspace.
Session session = connector.openSession();

Setting the input source

To set the input data for Spark, you can do one of the following.

  • Reading from a table with a column line as the input
// Read rows from table and convert them to an RDD.
JavaRDD<String> rows = javaFunctions(sc).cassandraTable(keyspace, inputTable)
                                        .select("line")
                                        .map(row -> row.getString("line"));
  • Reading from a file as the input:
// Read the input file and convert it to an RDD.
JavaRDD<String> rows = sc.textFile(inputFile);

Performing the word count processing

The word count is performed using the following code snippet.

// Perform the word count.
JavaPairRDD<String, Integer> counts = rows.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
                                          .mapToPair(word -> new Tuple2<String, Integer>(word, 1))
                                          .reduceByKey((x, y) ->  x + y);

Setting the output table

The output is written to the outTable table.

// Create the output table.
session.execute("CREATE TABLE IF NOT EXISTS " + outTable +
                " (word VARCHAR PRIMARY KEY, count INT);");

// Save the output to the CQL table.
javaFunctions(counts).writerBuilder(keyspace, outputTable, mapTupleToRow(String.class, Integer.class))
                     .withColumnSelector(someColumns("word", "count"))
                     .saveToCassandra();

PySpark

Start PySpark with for Scala 2.10:

$ pyspark --packages com.yugabyte.spark:spark-cassandra-connector_2.10:2.0.5-yb-2

For Scala 2.11:

$ pyspark --packages com.yugabyte.spark:spark-cassandra-connector_2.11:2.0.5-yb-2

sbt

Add the following library dependency to your project configuration:

libraryDependencies += "com.yugabyte.spark" %% "spark-cassandra-connector" % "2.0.5-yb-2"
Develop
Apache Kafka
Develop
JanusGraph
Talk to Community
  • Slack
  • Github
  • Forum
  • StackOverflow
Yugabyte
Contact Us
Copyright © 2017-2019 Yugabyte, Inc. All rights reserved.