All Projects → sandabuliu → Python Stream

sandabuliu / Python Stream

更优雅的流式数据处理方式

Programming Languages

python
139335 projects - #7 most used programming language

Projects that are alternatives of or similar to Python Stream

Athenax
SQL-based streaming analytics platform at scale
Stars: ✭ 1,178 (+5790%)
Mutual labels:  stream, data
Gopherlabs
Go - Beginners | Intermediate | Advanced
Stars: ✭ 205 (+925%)
Mutual labels:  structure, data
Gollum
An n:m message multiplexer written in Go
Stars: ✭ 883 (+4315%)
Mutual labels:  stream, log
Baize
白泽自动化运维系统:配置管理、网络探测、资产管理、业务管理、CMDB、CD、DevOps、作业编排、任务编排等功能,未来将添加监控、报警、日志分析、大数据分析等部分内容
Stars: ✭ 296 (+1380%)
Mutual labels:  data, log
aushape
A library and a tool for converting audit logs to XML and JSON
Stars: ✭ 37 (+85%)
Mutual labels:  stream, log
Illuminati
This is a Platform that collects all the data accuring in your Application and shows the data in real time by using Kibana or other tools.
Stars: ✭ 106 (+430%)
Mutual labels:  stream, log
Rain
Visualize vertical data inside your terminal 💦
Stars: ✭ 84 (+320%)
Mutual labels:  data, log
Tlog
Terminal I/O logger
Stars: ✭ 170 (+750%)
Mutual labels:  stream, log
Typed Immutable
Immutable and structurally typed data
Stars: ✭ 263 (+1215%)
Mutual labels:  structure, data
Mithril Data
A rich data model library for Mithril javascript framework
Stars: ✭ 17 (-15%)
Mutual labels:  stream, data
Remote Web Streams
Web streams that work across web workers and iframes.
Stars: ✭ 26 (+30%)
Mutual labels:  stream
Databook
A facebook for data
Stars: ✭ 26 (+30%)
Mutual labels:  data
Samples Viewer Generator
🎉 A CLI utility tool to generate web app of data visualization samples for presentation purpose
Stars: ✭ 13 (-35%)
Mutual labels:  data
Modelassistant
Elegant library to manage the interactions between view and model in Swift
Stars: ✭ 26 (+30%)
Mutual labels:  data
Unityprojecttreegenerator
This script will generate universal folder structure for your Unity3D project.
Stars: ✭ 12 (-40%)
Mutual labels:  structure
Dendro
"Open-source Dropbox" with added description features. It is a data storage and description platform designed to help researchers and other users to describe their data files, built on Linked Open Data and ontologies. Users can use Dendro to publish data to CKAN, Zenodo, DSpace or EUDAT's B2Share and others.
Stars: ✭ 25 (+25%)
Mutual labels:  data
Trie
A Mixed Trie and Levenshtein distance implementation in Java for extremely fast prefix string searching and string similarity.
Stars: ✭ 25 (+25%)
Mutual labels:  stream
Chunk Store Stream
Convert an abstract-chunk-store compliant store into a readable or writable stream
Stars: ✭ 24 (+20%)
Mutual labels:  stream
Go Mesh
Realtime data exchange platform for Smart Cities
Stars: ✭ 20 (+0%)
Mutual labels:  data
React Styleguide
ReactJS style guide for component-based projects.
Stars: ✭ 14 (-30%)
Mutual labels:  structure

python-stream

说明

数据流式框架, 可用作数据清洗, 数据预处理, 数据迁移等应用场景

更优雅的流式数据处理方式

安装


pip install git+https://github.com/sandabuliu/python-stream.git

or

git clone https://github.com/sandabuliu/python-stream.git
cd python-agent
python setup.py install

QuickStart


Examples

Word Count
from pystream.executor.source import Memory
from pystream.executor.executor import Map, Iterator, ReducebyKey
    
data = Memory([
    'Wikipedia is a free online encyclopedia, created and edited by volunteers around the world and hosted by the Wikimedia Foundation.',
    'Search thousands of wikis, start a free wiki, compare wiki software.',
    'The official Wikipedia Android app is designed to help you find, discover, and explore knowledge on Wikipedia.'
])
p = data | Map(lambda x: x.split(' ')) | Iterator(lambda x: (x.strip('.,'), 1)) | ReducebyKey(lambda x, y: x+y)
result = {}
for key, value in p:
    result[key] = value
print result.items()

执行结果

[('and', 3), ('wiki', 2), ('compare', 1), ('help', 1), ('is', 2), ('Wikipedia', 3), ('discover', 1), ('hosted', 1), ('Android', 1), ('find', 1), ('Foundation', 1), ('knowledge', 1), ('to', 1), ('by', 2), ('start', 1), ('online', 1), ('you', 1), ('thousands', 1), ('app', 1), ('edited', 1), ('Search', 1), ('around', 1), ('free', 2), ('explore', 1), ('designed', 1), ('world', 1), ('The', 1), ('the', 2), ('a', 2), ('on', 1), ('created', 1), ('Wikimedia', 1), ('official', 1), ('encyclopedia', 1), ('of', 1), ('wikis', 1), ('volunteers', 1), ('software', 1)]
计算π
from random import random
from pystream.executor.source import Faker
from pystream.executor.executor import Executor, Map, Group

class Pi(Executor):
    def __init__(self, **kwargs):
        super(Pi, self).__init__(**kwargs)
        self.counter = 0
        self.result = 0

    def handle(self, item):
        self.counter += 1
        self.result += item
        return 4.0*self.result/self.counter

s = Faker(lambda: random(), 100000) | Map(lambda x: x*2-1) | Group(size=2) | Map(lambda x: 1 if x[0]**2+x[1]**2 <= 1 else 0) | Pi()

res = None
for _ in s:
    res = _
print res

执行结果

3.14728
排序
from random import randint
from pystream.executor.source import Memory
from pystream.executor.executor import Sort
m = Memory([randint(0, 100) for i in range(10)]) | Sort()

for i in m:
    print list(i)

执行结果

[94]
[94, 99]
[18, 94, 99]
[18, 40, 94, 99]
[18, 26, 40, 94, 99]
[18, 26, 40, 63, 94, 99]
[18, 26, 40, 63, 83, 94, 99]
[3, 18, 26, 40, 63, 83, 94, 99]
[3, 18, 26, 40, 63, 83, 83, 94, 99]
[3, 16, 18, 26, 40, 63, 83, 83, 94, 99]
在 hadoop 中使用
wordcount
mapper.py
from pystream.executor.source import Stdin
from pystream.executor.executor import Map, Iterator
from pystream.executor.output import Stdout

s = Stdin() | Map(lambda x: x.strip().split()) | Iterator(lambda x: "%s\t1" % x) | Stdout()
s.start()
reducer.py
from pystream.executor.source import Stdin
from pystream.executor.executor import Map, ReducebySortedKey
from pystream.executor.output import Stdout

s = Stdin() | Map(lambda x: x.strip().split('\t')) | ReducebySortedKey(lambda x, y: int(x)+int(y)) | Map(lambda x: '%s\t%s' % x) | Stdout()
s.start()
解析 NGINX 日志
from pystream.config import rule
from pystream.executor.source import File
from pystream.executor.executor import Parser
s = File('/var/log/nginx/access.log') | Parser(rule('nginx'))

for item in s:
    print item

执行结果

{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
导出数据库数据
from sqlalchemy import create_engine
from pystream.executor.source import SQL
from pystream.executor.output import Csv
from pystream.executor.wraps import Batch

engine = create_engine('mysql://root:[email protected]:3306/test')  
conn = engine.connect()
s = SQL(conn, 'select * from faker') | Batch(Csv('/tmp/output'))

for item in s:
    print item['data']
    print item['exception']
conn.close()

数据源

读取文件数据
from pystream.executor.source import Tail, File, Csv
Tail('/var/log/nginx/access.log')
File('/var/log/nginx/*.log')
Csv('/tmp/test*.csv')
读取 TCP 流数据
from pystream.executor.source import TCPClient
TCPClient('/tmp/pystream.sock')
TCPClient(('127.0.0.1', 10000))
读取 python 数据
from Queue import Queue as Q
from random import randint
from pystream.executor.source import Memory, Faker, Queue
queue = Q(10)

Memory([1, 2, 3, 4])
Faker(randint, 1000)
Queue(queue)
读取常用模块数据
from pystream.executor.source import SQL, Kafka
SQL(conn, 'select * from faker')   # 读取数据库数据
Kafka('topic1', '127.0.0.1:9092')  # 读取 kafka 数据

数据输出

输出到文件
from pystream.executor.output import File, Csv
File('/tmp/output')
Csv('/tmp/output.csv')
通过HTTP输出
from pystream.executor.output import HTTPRequest
HTTPRequest('http://127.0.0.1/api/data')
输出到kafka
from pystream.executor.output import Kafka
Kafka('topic', '127.0.0.1:9092')

中间件

队列
from pystream.executor.source import Tail
from pystream.executor.output import Stdout
from pystream.executor.middleware import Queue

s = Tail('/Users/tongbin01/PycharmProjects/python-stream/README.md') | Queue() | Stdout()
s.start()
订阅
from random import randint
from pystream.executor.source import Tail
from pystream.executor.executor import Map
from pystream.executor.output import Stdout

from pystream.executor.middleware import Subscribe
from pystream.executor.wraps import Daemonic

sub = Tail('/var/log/messages') | Map(lambda x: (str(randint(1, 2)), x.strip())) | Subscribe()
Daemonic(sub).start()

s = sub['1'] | Map(lambda x: x.strip()) | Stdout()
s.start()

TodoList

  • 订阅器(Subscribe)客户端超时处理
  • 并行计算
  • HTTP 异步输出/异步源
  • 添加其他基础输出/基础源
  • 添加对其他常用模块的支持, 如 redis, kafka, flume, log-stash, 各种数据库等

Copyright © 2017 [email protected]

Note that the project description data, including the texts, logos, images, and/or trademarks, for each open source project belongs to its rightful owner. If you wish to add or remove any projects, please contact us at [email protected].