All Projects → bryanyang0528 → Ksql Python

bryanyang0528 / Ksql Python

Licence: mit
A python wrapper for the KSQL REST API.

Programming Languages

python
139335 projects - #7 most used programming language

Labels

Projects that are alternatives of or similar to Ksql Python

Schema Registry
A CLI and Go client for Kafka Schema Registry
Stars: ✭ 105 (+7.14%)
Mutual labels:  api, kafka
Devops Bash Tools
550+ DevOps Bash Scripts - AWS, GCP, Kubernetes, Kafka, Docker, APIs, Hadoop, SQL, PostgreSQL, MySQL, Hive, Impala, Travis CI, Jenkins, Concourse, GitHub, GitLab, BitBucket, Azure DevOps, TeamCity, Spotify, MP3, LDAP, Code/Build Linting, pkg mgmt for Linux, Mac, Python, Perl, Ruby, NodeJS, Golang, Advanced dotfiles: .bashrc, .vimrc, .gitconfig, .screenrc, .tmux.conf, .psqlrc ...
Stars: ✭ 226 (+130.61%)
Mutual labels:  api, kafka
Zerocode
A community-developed, free, open source, microservices API automation and load testing framework built using JUnit core runners for Http REST, SOAP, Security, Database, Kafka and much more. Zerocode Open Source enables you to create, change, orchestrate and maintain your automated test cases declaratively with absolute ease.
Stars: ✭ 482 (+391.84%)
Mutual labels:  api, kafka
Soundcloud
Soundcloud.com API wrapper written in PHP with OAuth2 support.
Stars: ✭ 94 (-4.08%)
Mutual labels:  api
Api Restful Con Laravel Guia Definitiva
Repositorio para el código base del curso "API RESTful con Laravel - Guía Definitiva"
Stars: ✭ 95 (-3.06%)
Mutual labels:  api
Streamx
kafka-connect-s3 : Ingest data from Kafka to Object Stores(s3)
Stars: ✭ 96 (-2.04%)
Mutual labels:  kafka
Bluelinky
An unofficial nodejs API wrapper for Hyundai bluelink
Stars: ✭ 94 (-4.08%)
Mutual labels:  api
Tiledesk Server
Tiledesk server. Tiledesk is an Open Source Live Chat platform written in NodeJs and MongoDB
Stars: ✭ 94 (-4.08%)
Mutual labels:  api
Api2html
Using the data from your API, generate the HTML on the fly! Server-side rendering of the mustache templates
Stars: ✭ 97 (-1.02%)
Mutual labels:  api
Appy Backend
A user system to bootstrap your app.
Stars: ✭ 96 (-2.04%)
Mutual labels:  api
Satis Server
🐳 Private, self-hosted Composer/Satis repository with unlimited private and open-source packages and support for Git, Mercurial, and Subversion. HTTP API, HTTPs support, webhook handler, scheduled builds, Slack and HipChat integration.
Stars: ✭ 96 (-2.04%)
Mutual labels:  api
Myetherapi
An API by MyEtherWallet. ETH / Ropsten / JSON RPC / Web3
Stars: ✭ 95 (-3.06%)
Mutual labels:  api
Opensubtitles Api
nodejs opensubtitles.org api wrapper for downloading and uploading subtitles in multiple langs
Stars: ✭ 96 (-2.04%)
Mutual labels:  api
Wa Automate Nodejs
💬 🤖 The most advanced NodeJS WhatsApp library for chatbots with advanced features. Be sure to 🌟 this repository for updates!
Stars: ✭ 1,326 (+1253.06%)
Mutual labels:  api
Graphql Dotnetcore
GraphQL for .NET core based on https://github.com/graphql/graphql-js
Stars: ✭ 97 (-1.02%)
Mutual labels:  api
Http restful api
整理HTTP后台端的RESTful API方面的知识
Stars: ✭ 94 (-4.08%)
Mutual labels:  api
Willa
A Clojure DSL for Kafka Streams
Stars: ✭ 97 (-1.02%)
Mutual labels:  kafka
Kafka Php
kafka php client
Stars: ✭ 1,340 (+1267.35%)
Mutual labels:  kafka
Python Binance Chain
Binance Chain Exchange API python implementation for automated trading
Stars: ✭ 96 (-2.04%)
Mutual labels:  api
Shellfuncs
Python API to execute shell functions as they would be Python functions
Stars: ✭ 96 (-2.04%)
Mutual labels:  api

ksql-python

A python wrapper for the KSQL REST API. Easily interact with the KSQL REST API using this library.

Supported KSQLDB version: 0.10.1+ Supported Python version: 3.5+

.. image:: https://travis-ci.org/bryanyang0528/ksql-python.svg?branch=master :target: https://travis-ci.org/bryanyang0528/ksql-python

.. image:: https://codecov.io/gh/bryanyang0528/ksql-python/branch/master/graph/badge.svg :target: https://codecov.io/gh/bryanyang0528/ksql-python

.. image:: https://pepy.tech/badge/ksql :target: https://pepy.tech/project/ksql

.. image:: https://pepy.tech/badge/ksql/month :target: https://pepy.tech/project/ksql/month

.. image:: https://img.shields.io/badge/license-MIT-yellow.svg :target: https://github.com/bryanyang0528/ksql-python/blob/master/LICENSE

Installation

.. code:: bash

pip install ksql

Or

.. code:: bash

git clone https://github.com/bryanyang0528/ksql-python
cd ksql-python
python setup.py install

Getting Started

Setup for KSQL


This is the GITHUB page of KSQL. https://github.com/confluentinc/ksql

If you have installed open source Confluent CLI (e.g. by installing Confluent Open Source or Enterprise Platform), you can start KSQL and its dependencies with one single command:

.. code:: bash

    confluent start ksql-server

Setup for ksql-python API
  • Setup for the KSQL API:

.. code:: python

from ksql import KSQLAPI
client = KSQLAPI('http://ksql-server:8088')
  • Setup for KSQl API with logging enabled:

.. code:: python

import logging
from ksql import KSQLAPI
logging.basicConfig(level=logging.DEBUG)
client = KSQLAPI('http://ksql-server:8088')
  • Setup for KSQL API with Basic Authentication

.. code:: python

from ksql import KSQLAPI
client = KSQLAPI('http://ksql-server:8088', api_key="your_key", secret="your_secret")

Options


+---------------+-----------+------------+--------------------------------------------------------------+
| Option        | Type      | Required   | Description                                                  |
+===============+===========+============+==============================================================+
| ``url``       | string    | yes        | Your ksql-server url. Example: ``http://ksql-server:8080``   |
+---------------+-----------+------------+--------------------------------------------------------------+
| ``timeout``   | integer   | no         | Timout for Requests. Default: ``5``                          |
+---------------+-----------+------------+--------------------------------------------------------------+
| ``api_key``   | string    | no         | API Key to use on the requests                               |
+---------------+-----------+------------+--------------------------------------------------------------+
| ``secret``    | string    | no         | Secret to use on the requests                                |
+---------------+-----------+------------+--------------------------------------------------------------+

Main Methods

ksql ^^^^

This method can be used for some KSQL features which are not supported via other specific methods like query, create_stream or create_stream_as. The following example shows how to execute the show tables statement:

.. code:: python

client.ksql('show tables')
  • Example Response [{'tables': {'statementText': 'show tables;', 'tables': []}}]

query ^^^^^

It will execute sql query and keep listening streaming data.

.. code:: python

client.query('select * from table1')

This command returns a generator. It can be printed e.g. by reading its values via next(query) or a for loop. Here is a complete example:

.. code:: python

from ksql import KSQLAPI client = KSQLAPI('http://localhost:8088') query = client.query('select * from table1') for item in query: print(item)

  • Example Response

    ::

    {"row":{"columns":[1512787743388,"key1",1,2,3]},"errorMessage":null}
    {"row":{"columns":[1512787753200,"key1",1,2,3]},"errorMessage":null}
    {"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null}
    {"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null}
    

Query with HTTP/2 ^^^^^^^^^^^^^^^^^ Execute queries with the new /query-stream endpoint. Documented here <https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#executing-pull-or-push-queries>_

To execute a sql query use the same syntax as the regular query, with the additional use_http2=True parameter.

.. code:: python

client.query('select * from table1', use_http2=True)

A generator is returned with the following example response

::

   {"queryId":"44d8413c-0018-423d-b58f-3f2064b9a312","columnNames":["ORDER_ID","TOTAL_AMOUNT","CUSTOMER_NAME"],"columnTypes":["INTEGER","DOUBLE","STRING"]}
   [3,43.0,"Palo Alto"]
   [3,43.0,"Palo Alto"]
   [3,43.0,"Palo Alto"]

To terminate the query above use the close_query call. Provide the queryId returned from the query call.

.. code:: python

client.close_query("44d8413c-0018-423d-b58f-3f2064b9a312")

Insert rows into a Stream with HTTP/2 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Uses the new /inserts-stream endpoint. See documentation <https://docs.ksqldb.io/en/0.10.0-ksqldb/developer-guide/ksqldb-rest-api/streaming-endpoint/#inserting-rows-into-an-existing-stream>_

.. code:: python

rows = [
        {"ORDER_ID": 1, "TOTAL_AMOUNT": 23.5, "CUSTOMER_NAME": "abc"},
        {"ORDER_ID": 2, "TOTAL_AMOUNT": 3.7, "CUSTOMER_NAME": "xyz"}
    ]

results = self.api_client.inserts_stream("my_stream_name", rows)

An array of object will be returned on success, with the status of each row inserted.

Simplified API


create_stream/ create_table
^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code:: python

    client.create_stream(table_name=table_name,
                         columns_type=columns_type,
                         topic=topic,
                         value_format=value_format)

Options
^^^^^^^

+-----------------+-----------+----------+--------------------------------------------------------------+
| Option          | Type      | Required | Description                                                  |
+=================+===========+==========+==============================================================+
| ``table_name``  | string    | yes      | name of stream/table                                         |
+-----------------+-----------+----------+--------------------------------------------------------------+
| ``columns_type``| list      | yes      | ex:``['viewtime bigint','userid varchar','pageid varchar']`` |
+-----------------+-----------+----------+--------------------------------------------------------------+
| ``topic``       | string    | yes      | Kafka topic                                                  |
+-----------------+-----------+----------+--------------------------------------------------------------+
| ``value_format``| string    | no       | ``JSON`` (Default) or ``DELIMITED`` or ``AVRO``              |
+-----------------+-----------+----------+--------------------------------------------------------------+
| ``key``         | string    | for Table| Key (used for JOINs)                                         |
+-----------------+-----------+----------+--------------------------------------------------------------+


-  Responses

:If create table/stream succeed:
  return True

:If failed:
  raise a CreateError(respose_from_ksql_server)

create_stream_as
^^^^^^^^^^^^^^^^

a simplified api for creating stream as select

.. code:: python

    client.create_stream_as(table_name=table_name,
                            select_columns=select_columns,
                            src_table=src_table,
                            kafka_topic=kafka_topic,
                            value_format=value_format,
                            conditions=conditions,
                            partition_by=partition_by,
                            **kwargs)


.. code:: sql

  CREATE STREAM <table_name>
  [WITH ( kafka_topic=<kafka_topic>, value_format=<value_format>, property_name=expression ... )]
  AS SELECT  <select_columns>
  FROM <src_table>
  [WHERE <conditions>]
  PARTITION BY <partition_by>];

Options
^^^^^^^

+-------------------+-----------+----------+--------------------------------------------------------------+
| Option            | Type      | Required | Description                                                  |
+===================+===========+==========+==============================================================+
| ``table_name``    | string    | yes      | name of stream/table                                         |
+-------------------+-----------+----------+--------------------------------------------------------------+
| ``select_columns``| list      | yes      | you can select ``[*]`` or ``['columnA', 'columnB']``         |
+-------------------+-----------+----------+--------------------------------------------------------------+
| ``src_table``     | string    | yes      | name of source table                                         |
+-------------------+-----------+----------+--------------------------------------------------------------+
| ``kafka_topic``   | string    | no       | The name of the Kafka topic of this new stream(table).       |
+-------------------+-----------+----------+--------------------------------------------------------------+
| ``value_format``  | string    | no       | ``DELIMITED``, ``JSON``(Default) or ``AVRO``                 |
+-------------------+-----------+----------+--------------------------------------------------------------+
| ``conditions``    | string    | no       | The conditions in the where clause.                          |
+-------------------+-----------+----------+--------------------------------------------------------------+
| ``partition_by``  | string    | no       | Data will be distributed across partitions by this column.   |
+-------------------+-----------+----------+--------------------------------------------------------------+
| ``kwargs``        | pair      | no       | please provide ``key=value`` pairs. Please see more options. |
+-------------------+-----------+----------+--------------------------------------------------------------+

KSQL JOINs

KSQL JOINs between Streams and Tables are not supported yet via explicit methods, but you can use the ksql method for this like the following:

.. code:: python

client.ksql("CREATE STREAM join_per_user WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='join_per_user') AS SELECT Time, Amount FROM source c INNER JOIN users u on c.user = u.userid WHERE u.USERID = 1")

FileUpload


upload
^^^^^^^^^^^^^^^^^^^^^^^^^^^
Run commands from a .ksql file. Can only support ksql commands and not streaming queries.

.. code:: python

     from ksql.upload import FileUpload
     pointer = FileUpload('http://ksql-server:8080')
     pointer.upload('rules.ksql')


Options
^^^^^^^

+-----------------+-----------+----------+--------------------------------------------------------------+
| Option          | Type      | Required | Description                                                  |
+=================+===========+==========+==============================================================+
| ``ksqlfile``    | string    | yes      | name of file containing the rules                            |
+-----------------+-----------+----------+--------------------------------------------------------------+


-  Responses

:If ksql-commands succesfully executed:
  return (List of server response for all commands)

:If failed:
  raise the appropriate error

More Options
^^^^^^^^^^^^

There are more properties (partitions, replicas, etc...) in the official document.

`KSQL Syntax Reference <https://github.com/confluentinc/ksql/blob/0.1.x/docs/syntax-reference.md#syntax-reference>`_

-  Responses

:If create table/stream succeed:
  return True

:If failed:
  raise a CreatError(respose_from_ksql_server)
Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].