Deploy Your First Lightflus Application

Let’s start to create a Lightflus task. We use word count as the example to show you how to deploy a dataflow job on Lightflus

  1. Initialize a Typescript project
npm install typescript
npx tsc --init

vi wordCount.ts

The project’s structure likes below:

example |
	-- tsconfig.json
	-- package.json
	-- node_modules
	-- wordCount.ts

Then install Lightflus API dependency

npm i lightflus-api

Set Environment variables

export LIGHTFLUS_ENDPOINT=localhost:8080
  1. Modify tsconfig.json

We recommand you to set up properties in tsconfig.json file like below:

{
  "compilerOptions": {
    "module": "commonjs",
    "target": "es2016",
    "sourceMap": true,
    "baseUrl": "./",
    "incremental": true,
    "skipLibCheck": true,
    "strictNullChecks": false,
    "forceConsistentCasingInFileNames": false,
    "strictPropertyInitialization": false,
    "esModuleInterop": true,
    "moduleResolution": "node"
  }
}
  1. Implement word count
// wordCount example

import {context} from "lightflus-api/src/stream/context";
import {kafka, redis} from "lightflus-api/src/connectors/connectors";
import ExecutionContext = context.ExecutionContext;
import Kafka = kafka.Kafka;
import Redis = redis.Redis;

async function wordCount(ctx: ExecutionContext) {
    // fetch string stream from kafka
    let source = Kafka
        .builder()
        .brokers(["kafka:9092"])
        // topic
        .topic("topic_2")
        // groupId
        .group("word_count")
        // deserialization type
        .build<string>(undefined, typeof "");

    // It will persist the counting values in Redis
    let sink = Redis.new<{ t0: number, t1: string }>()
        .host("redis")
        .keyExtractor((v) => v.t1)
        .valueExtractor((v) => v.t0.toString());

    // create a Dataflow
    let stream = source.createFlow(ctx);

    // We design the Dataflow
    await stream.flatMap(value => value.split(" ").map(v => {
        return {t0: 1, t1: v};
    }))
        .keyBy(v => v.t1)
        .reduce((v1, v2) => {
            return {t1: v1.t1, t0: v1.t0 + v2.t0};
        })
        // write the results into Redis sink
        .sink(sink)
        // Then execute
        .execute();
}

wordCount(ExecutionContext.new("wordCount", "default")).then();
  1. Then you can deploy your application