Lightflus can supports Redis, Mysql, Kafka or any other data sinks. It will write the result data which calculated by the dataflow into these sinks as your configurations.
This chapter will describe how to use Kafka Sink in Lightflus.
let sink = Kafka
.builder()
.brokers(["localhost:9092"])
// topic
.topic("topic")
// if partition is set, data will always sink to this partition
// For now, we do not support keyed hash partition.
.partition(1)
// serialization type
.build<string>(undefined, typeof "");
This chapter will describe how to use Redis Sink in Lightflus.
let sink = Redis.new<{ t0: number, t1: string }>()
.host("localhost")
// you must configure the key extractor
.keyExtractor((v) => v.t1)
// you must configure the value extractor
.valueExtractor((v) => v.t0);
This chapter will describe how to use Mysql Sink in Lightflus.
import {mysql} from "lightflus-api/src/connectors/connectors";
import Mysql = mysql.Mysql;
let sink = Mysql.new<{ id: string, key: string, value: string }>()
.connection({
host: "localhost",
username: "root",
password: "123",
database: "test"
})
.statement("insert into example (id, key, value) values (?,?,?)")
.extractors(statement => {
statement.setString(0, val => val.id);
statement.setString(1, val => val.key);
statement.setString(2, val => val.value)
})
insertupdatestringnumber