Automating Deployment of KSQL Architecture

OpenMarket – June 1, 2020

by Marisa Yamasaki

It’s no secret that automation is essential to streamlined development. At OpenMarket, we collaborate across teams and time zones, requiring a structured deployment process to stay organised. This was certainly true during a recent proof-of-concept using KSQL for data transformation. While exploring the abilities of Confluent technologies was exciting, developing a build automation pipeline proved to be a challenge. This post will outline our solution for managing deployment of KSQL streams and tables.

To integrate with our existing CI/CD, we used Jenkins pipelines to run Ansible playbooks, but these concepts can be applied to other configuration tools.

Setting up and tearing down

Our project manipulated data from several input topics, creating a sequence of streams and tables to calculate results. Each query built on previous ones, so if a change was made to the KSQL syntax (e.g. modifying a time window, or changing the type of join), then the entire sequence had to be torn down and rebuilt from scratch.

To illustrate this concept, consider a simple example with just two queries. The first creates a stream from an existing input topic, and the second re-partitions the stream:

CREATE STREAM PERSON_IDENTIFIER (first VARCHAR, last VARCHAR)
    WITH (KAFKA_TOPIC='person_topic', VALUE_FORMAT='JSON', KEY='last');
CREATE STREAM PERSON_IDENTIFIER_FIRST
    AS SELECT * FROM PERSON_IDENTIFIER PARTITION BY first;

These queries can be placed in a .sql file to run using KSQL’s RUN SCRIPT command.

Tearing down looks similar, except the streams must be dropped in the opposite order, because the second depends on the first:

DROP STREAM IF EXISTS PERSON_IDENTIFIER_FIRST DELETE TOPIC;
DROP STREAM IF EXISTS PERSON_IDENTIFIER;

In an Ansible playbook, copy the setup and teardown scripts to the host, then run them using RUN SCRIPT.

- name: "Tear down and set up KSQL queries"
  hosts: "{{ hostname }}"
  any_errors_fatal: True

  tasks:
    - name: "Copy teardown script file to host"
      copy:
        src: /dir/ksql-teardown-commands.sql
        dest: "/dir/ksql-teardown-commands.sql"

    - name: "Run teardown commands in KSQL"
      shell: "./current/bin/ksql http://{{ hostname }}:8088 <<< \"RUN SCRIPT '/dir/ksql-teardown-commands.sql';\""

    - name: "Copy setup script file to host"
      copy:
        src: /dir/ksql-setup-commands.sql
        dest: "/dir/ksql-setup-commands.sql"

    - name: "Run setup commands in KSQL"
      shell: "./current/bin/ksql http://{{ hostname }}:8088 <<< \"RUN SCRIPT '/dir/ksql-setup-commands.sql';\""

This playbook can be run from the command line or triggered using an automation server such as Jenkins.

Terminating the queries

The above commands handle destruction and reconstruction of streams, but they fail to terminate the underlying queries. Attempting to drop PERSON_IDENTIFIER will result in an error, because the CSAS query used to construct PERSON_IDENTIFIER_FIRST is still running. As of Confluent 5.3, it is not possible to automatically terminate a query when a stream or table is dropped, nor is bulk termination supported. The simplest automated solution is to fetch the queries and terminate them one by one:

- name: "Terminate running queries on all hosts in group"
  hosts: "{{ hostname }}"
  any_errors_fatal: True

  tasks:
    # Fetch and terminate currently running queries
    - name: "Get currently running queries via REST"
      uri:
        url: "http://{{ hostname }}:8088/ksql"
        method: POST
        body: "{\"ksql\": \"SHOW QUERIES;\"}"
        headers:
          Content-Type: "application/vnd.ksql.v1+json; charset=utf-8"
        return_content: yes
        body_format: json
      register: all_queries

    - name: "Parse query ids"
      set_fact:
        query_ids: "{{ all_queries | to_json | from_json | json_query(query) | list }}"
      vars:
        query: "json[0].queries[?starts_with(id,'CSAS_PERSON_') || starts_with(id,'CTAS_PERSON_')].id"
      delegate_to: 127.0.0.1

    - name: "Terminate queries by id"
      shell: "./current/bin/ksql http://{{ hostname }}:8088 <<< \"TERMINATE {{ item }};\""
      with_items: "{{ query_ids }}"

We use REST once again, this time fetching all of the currently running queries, then filtering for the ones associated with this project (i.e. beginning with CSAS_PERSON_ or CTAS_PERSON_). Then, the TERMINATE command is issued for each query.

If a query fails to terminate, the error will not be detected by Ansible since the command was run in shell. As we only want to proceed if all relevant queries are terminated, we repeat the process of fetching and filtering, triggering the playbook to fail if the filtered list is not empty:

    # Pause for 5 seconds to allow the queries to finish terminating
    - pause:
        seconds: 5

    # Confirm that all PERSON queries were terminated
    - name: "Get remaining queries via REST"
      uri:
        url: "http://{{ hostname }}:8088/ksql"
        method: POST
        body: "{\"ksql\": \"SHOW QUERIES;\"}"
        headers:
          Content-Type: "application/vnd.ksql.v1+json; charset=utf-8"
        return_content: yes
        body_format: json
      register: remaining_queries

    - name: "Parse remaining query ids"
      set_fact:
        remaining_ids: "{{ remaining_queries | to_json | from_json | json_query(remaining_query) | list }}"
      vars:
        remaining_query: "json[0].queries[?starts_with(id,'CSAS_PERSON_') || starts_with(id,'CTAS_PERSON_')].id"
      delegate_to: 127.0.0.1

    - fail:
        msg: "Some queries were not successfully terminated: {{ remaining_ids }}"
      when: remaining_ids|length != 0

Once query termination completes successfully, the streams can then be dropped and rebuilt. New streams and tables can be added to the setup .sql file, and the existing ones can be modified, as long as the corresponding teardown .sql file is kept up-to-date. Redeployment will rebuild the KSQL environment to reflect the changes.

Other solutions

There are other approaches to bulk query termination (see Robin Moffatt's post for an interesting one), but this method worked best with our existing automation tools.

Our sequence of KSQL syntax became increasingly complex as we added streams and tables, so maintaining an organised method of setup and teardown was essential. Had this project been more than just a proof of concept, we would have sought out a more refined solution, but the steps above worked well to support our rapid development process.

See all tech blog posts

Related Content