Skip to content

为 CKafka Connector 场景提供数据富化、数据脱敏、数据映射、数据转换、数据过滤等函数模板 #181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions Python3.6-CkafkaConnectorDataDesensitization/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"serverless-cloud-function-application": {
"Chinese": {
"name": "Ckafka 连接器数据脱敏",
"description": "使用Ckafka 连接器+云函数,提供数据脱敏能力,示例代码演示了将真实手机号进行脱敏的实现,支持功能自定义。",
"attention": "",
"tutorial":"",
"author": {
"name": "腾讯云"
}
},
"English": {
"name": "CkafkaConnectorDataDesensitization",
"description": "This demo will receive Ckafka Connector's messages and do data desensitization job at real time.",
"attention": "",
"tutorial":" ",
"author": {
"name": "Tencent Cloud"
}
},
"runtime": "Python3.6",
"readme": "https://github.com/tencentyun/serverless-demo/tree/master/Python3.6-CkafkaConnectorDataDesensitization",
"version": "1.0.0",
"tags": [
"Python3.6",
"Ckafka",
"Connector",
"ETL"
]
}
}
11 changes: 11 additions & 0 deletions Python3.6-CkafkaConnectorDataDesensitization/serverless.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
component: scf
name: ap-guangzhou_default_CkafkaConnectorDataDesensitization
inputs:
name: CkafkaConnectorDataDesensitization
src: ./src
handler: index.main_handler
runtime: Python3.6
namespace: default
region: ap-guangzhou
memorySize: 128
timeout: 30
65 changes: 65 additions & 0 deletions Python3.6-CkafkaConnectorDataDesensitization/src/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env python
# -*- coding: utf8 -*-


import re


def main_handler(event, context):
"""
CKafka Connector 传递过来的 event 结构如下:
{
"event": {
"data": [
msg1,
msg2,
msg3
]
}

}
在这里,测试数据结构为:
{
"event": {
"data": [
"13911112222",
"13122223333",
"2345678"
]
}
}
"""
print('start main_handler')
num = len(event['event']['data'])
print('the length of msg body is [%s]'%num)
messages = []
for message in event['event']['data']:
pattern = re.compile(r'^1[3|4|5|7|8][0-9]{9}$', re.I)
searchObj = pattern.match(message) # 判断是否是合法手机号,如果不是,则不进行处理
if searchObj:
# 对手机号进行脱敏,将中间的四位数字换成星号
message = re.sub(r"(\d{3})\d{4}(\d{4})", r"\1****\2", message)
else:
print('[%s] is not a valid phone number'%message)
messages.append(message)
result = {
'result': 'Succeed',
'data': messages
}
# 如果处理失败,可封装错误消息体:
# {"result": "Failed"}
return result


if __name__ == '__main__':
event = {
"event": {
"data": [
"13911112222",
"13122223333",
"2345678"
]
}
}
result = main_handler(event, {})
print(result)
32 changes: 32 additions & 0 deletions Python3.6-CkafkaConnectorDataEnrichment/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"serverless-cloud-function-application": {
"Chinese": {
"name": "Ckafka 连接器数据富化",
"description": "使用Ckafka 连接器+云函数,提供数据富化能力,示例代码演示了数据反序列化、数据库实时查询数据进行数据富化的实现,支持功能自定义。",
"attention": "",
"tutorial":"",
"author": {
"name": "腾讯云"
}
},
"English": {
"name": "CkafkaConnectorETL",
"description": "This demo will receive Ckafka Connector's messages and do data enrichment job at real time.",
"attention": "",
"tutorial":" ",
"author": {
"name": "Tencent Cloud"
}
},
"runtime": "Python3.6",
"readme": "https://github.com/tencentyun/serverless-demo/tree/master/Python3.6-CkafkaConnectorETL",
"version": "1.0.0",
"tags": [
"Python3.6",
"Ckafka",
"Connector",
"ETL",
"MySQL"
]
}
}
22 changes: 22 additions & 0 deletions Python3.6-CkafkaConnectorDataEnrichment/serverless.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
component: scf
name: ap-guangzhou_default_CkafkaConnectorDataDesensitization
inputs:
name: CkafkaConnectorDataDesensitization
src: ./src
handler: index.main_handler
runtime: Python3.6
namespace: default
region: ap-guangzhou
memorySize: 128
timeout: 30
environment:
- key: dbhost
value: db host
- key: dbport
value: db port
- key: dbuser
value: db user name
- key: dbpwd
value: db password
- key: dbname
value: db name
112 changes: 112 additions & 0 deletions Python3.6-CkafkaConnectorDataEnrichment/src/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#!/usr/bin/env python
# -*- coding: utf8 -*-


import base64
import json
import logging
import os
import re
import time

import pymysql


logger = logging.getLogger()
# Need to add the appropriate configuration in environment variables. 需要在环境变量中添加相应的配置
DB_HOST = os.getenv('dbhost') # 数据库 host
DB_PORT = os.getenv('dbport') # 数据库 port
DB_USER = os.getenv('dbuser') # 数据库 username
DB_USER_PASSWORD = os.getenv('dbpwd') # 数据库密码
DB_NAME = os.getenv('dbname') # 数据库名称


class DBSource(object):
"""
Mysql 数据库作为数据源
"""
def __init__(self):
try:
self.conn = pymysql.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
passwd=DB_USER_PASSWORD,
db=DB_NAME,
connect_timeout=5)
except Exception as e:
logger.error(e)
logger.error("ERROR: Unexpected error: Could not connect to MySql instance.")
raise Exception(str(e))

def fetch(self, sql):
"""
查询记录
"""
with self.conn.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchall()
return result

db_source = DBSource()


def main_handler(event, context):
"""
CKafka Connector 传递过来的 event 结构如下:
{
"event": {
"data": [
msg1,
msg2,
]
}

}
在这里,测试数据结构为:
{
"event": {
"data": [
'{"name": "Anny", "age": 21, "phone": "13111111111"}',
'{"name": "Ernest", "age": 22, "phone": "13111112222"}',
]
}
}
其中,每一条数据都是 json string,包含的信息是用户的基础资料,分别是“姓名”、“年龄”、“手机号码”,
通过数据库查询,例如用手机号作为查询条件,找到数据库中的记录,将数据库中存储的其他字段合并到这条数据中,例如用户的“住址”,完成数据富化操作。
最后,每一条记录都增加一个字段,记录当前处理的时间。
"""
print('start main_handler')
num = len(event['event']['data'])
print('the length of msg body is [%s]'%num)
messages = []
for message in event['event']['data']:
# 将 json string 解析成字典
message = json.loads(message)
table = '<替换成实际的数据表>'
sql = 'SELECT * FROM %s WHERE phone = %s' % (table, message['phone'])
db_record = db_source.fetch(sql)
# 这里即可将 db_record 中的 address 字段添加到 message 中
message['address'] = db_record[0]['address']
message['time'] = time.time()
messages.append(message)
result = {
'result': 'Succeed',
'data': messages
}
# 如果处理失败,可封装错误消息体:
# {"result": "Failed"}
return result


if __name__ == '__main__':
event = {
"event": {
"data": [
'{"name": "Anny", "age": 21, "phone": "13111111111"}',
'{"name": "Ernest", "age": 22, "phone": "13111112222"}',
]
}
}
result = main_handler(event, {})
print(result)
31 changes: 31 additions & 0 deletions Python3.6-CkafkaConnectorDataFilter/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"serverless-cloud-function-application": {
"Chinese": {
"name": "Ckafka 连接器数据过滤",
"description": "使用Ckafka 连接器+云函数,提供数据过滤能力,示例代码演示了过滤真实手机号的实现,支持功能自定义。",
"attention": "",
"tutorial":"",
"author": {
"name": "腾讯云"
}
},
"English": {
"name": "CkafkaConnectorDataFilter",
"description": "This demo will receive Ckafka Connector's messages and do data filtering job at real time.",
"attention": "",
"tutorial":" ",
"author": {
"name": "Tencent Cloud"
}
},
"runtime": "Python3.6",
"readme": "https://github.com/tencentyun/serverless-demo/tree/master/Python3.6-CkafkaConnectorDataFilter",
"version": "1.0.0",
"tags": [
"Python3.6",
"Ckafka",
"Connector",
"ETL"
]
}
}
11 changes: 11 additions & 0 deletions Python3.6-CkafkaConnectorDataFilter/serverless.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
component: scf
name: ap-guangzhou_default_CkafkaConnectorDataFilter
inputs:
name: CkafkaConnectorDataFilter
src: ./src
handler: index.main_handler
runtime: Python3.6
namespace: default
region: ap-guangzhou
memorySize: 128
timeout: 30
63 changes: 63 additions & 0 deletions Python3.6-CkafkaConnectorDataFilter/src/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env python
# -*- coding: utf8 -*-


import re


def main_handler(event, context):
"""
CKafka Connector 传递过来的 event 结构如下:
{
"event": {
"data": [
msg1,
msg2,
msg3
]
}

}
在这里,测试数据结构为:
{
"event": {
"data": [
"13911112222",
"13122223333",
"2345678"
]
}
}
"""
print('start main_handler')
num = len(event['event']['data'])
print('the length of msg body is [%s]'%num)
messages = []
for message in event['event']['data']:
pattern = re.compile(r'^1[3|4|5|7|8][0-9]{9}$', re.I)
searchObj = pattern.match(message)
if searchObj:
messages.append(message)
else:
print('[%s] is not a valid phone number'%message)
result = {
'result': 'Succeed',
'data': messages
}
# 如果处理失败,可封装错误消息体:
# {"result": "Failed"}
return result


if __name__ == '__main__':
event = {
"event": {
"data": [
"13911112222",
"13122223333",
"2345678"
]
}
}
result = main_handler(event, {})
print(result)
Loading