Let’s start to create a Lightflus task. We use word count as the example to show you how to deploy a dataflow job on Lightflus
npm install typescript
npx tsc --init
vi wordCount.ts
The project’s structure likes below:
example |
-- tsconfig.json
-- package.json
-- node_modules
-- wordCount.ts
Then install Lightflus API dependency
npm i lightflus-api
Set Environment variables
export LIGHTFLUS_ENDPOINT=localhost:8080
tsconfig.jsonWe recommand you to set up properties in tsconfig.json file like below:
{
"compilerOptions": {
"module": "commonjs",
"target": "es2016",
"sourceMap": true,
"baseUrl": "./",
"incremental": true,
"skipLibCheck": true,
"strictNullChecks": false,
"forceConsistentCasingInFileNames": false,
"strictPropertyInitialization": false,
"esModuleInterop": true,
"moduleResolution": "node"
}
}
// wordCount example
import {context} from "lightflus-api/src/stream/context";
import {kafka, redis} from "lightflus-api/src/connectors/connectors";
import ExecutionContext = context.ExecutionContext;
import Kafka = kafka.Kafka;
import Redis = redis.Redis;
async function wordCount(ctx: ExecutionContext) {
// fetch string stream from kafka
let source = Kafka
.builder()
.brokers(["kafka:9092"])
// topic
.topic("topic_2")
// groupId
.group("word_count")
// deserialization type
.build<string>(undefined, typeof "");
// It will persist the counting values in Redis
let sink = Redis.new<{ t0: number, t1: string }>()
.host("redis")
.keyExtractor((v) => v.t1)
.valueExtractor((v) => v.t0.toString());
// create a Dataflow
let stream = source.createFlow(ctx);
// We design the Dataflow
await stream.flatMap(value => value.split(" ").map(v => {
return {t0: 1, t1: v};
}))
.keyBy(v => v.t1)
.reduce((v1, v2) => {
return {t1: v1.t1, t0: v1.t0 + v2.t0};
})
// write the results into Redis sink
.sink(sink)
// Then execute
.execute();
}
wordCount(ExecutionContext.new("wordCount", "default")).then();