侧边栏壁纸
  • 累计撰写 23 篇文章
  • 累计创建 12 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

接口自动化扫描服务实现-apiScanServer

usoo
2024-07-17 / 0 评论 / 0 点赞 / 159 阅读 / 0 字

背景

在当前团队中,我们测试不同的业务不同的需求之间往往有大量重复的测试用例,比如所有业务所有服务涉及的身份认证、签名校验都是同一套实现逻辑,开发可以直接复用的逻辑,但从测试角度所有功能实现下来都需要校验。为解决这个痛点问题,我实现了apiScanServer接口自动化扫描策略服务并记录下来,后面工作中遇到类似问题可以来回顾下。

接口自动化检测流程

  • 流程说明:开发在服务提测前,需要将服务所有的对外接口和内部接口在apiScanServer进行策略扫描,只有所有扫描策略通过后,才允许提测,需要在提测单贴上扫描结果图。
  • 交互图如下
    apiscan

数据库设计

  • 由于接口自动化检测服务主要获取的是扫描结果,不需要永久性存储,而且数据量大还会影响查询速率,所以采用redis数据库存储扫描结果。
  • redis表结构设计
key value
scan_id {“result”: True/False, “scan_strategy”: [“sqlInjection”, “xss”], “fail_data”: [{“attack”: “or 1=1”, “scan_key”: “kk”, “status_code”: 403, “res”: “text”}, {“attack”: “or 1=1”, “scan_key”: “kk”, “status_code”: 403, “res”: “text”}]}

接口设计

1. 单接口扫描

编码:json
协议:http
path:/scan
host:127.0.0.1
method:post
  • 入参:headers
字段 必填/选填 描述
Content-Type 必填 值为application/json
  • 入参:json
字段 必填/选填 字段类型 描述
url 必填 string url 包括协议、host、port、path
method 必填 string 请求方式 post/get/patch/put
headers 选填 dict {} cookie、sign、application等必要元素都需要描述在headers中
json 选填 dict{}
params 选填 dict{}
  • 请求demo
{"url": "http://note-api.wps.cn/v3/notesvr/set/notegroup", "method": "post", "headers": {"Content-Type": "application/json", "X-user-key": 2819371, "Cookie": {"wps_sid": "12312nhuoashdoahdsa"}}, "json": {"groupId": "text", "groupName": "qwhueqhdasid", "order": 0}}
  • 返回体
字段 字段类型 描述
scan_id string 扫描任务的唯一标识
scan_status bool True 说明符合扫描规则,False不符合扫描规则

2. 获取扫描结果接口

编码:json
协议:http
path:/scan_result
host:127.0.0.1
method:get
  • 入参:queryParams
字段 必填/选填 描述
scan_id 必填 扫描任务的唯一标识
  • 返回体
字段 字段类型 描述
scan_id string 扫描任务的唯一标识
status string no scanning/scan success,返回当前的扫描状态
result bool True扫描未发现问题,False扫描存在异常
scan_strategy list 已开启的扫描策略
fail_data list[{}] 扫描失败的场景

python代码实现

扫描单接口

@app.route('/scan', methods = ['POST'])
def api_scan():
   # 入参协议校验
   try:
       validate(instance = request.json, schema = api_scan_schema)
   except jsonschema.exceptions.ValidationError as err:
       print(f'{request.remote_addr} request json error, err msg: {err}!')
       return jsonify({"scan_id": None, "scan_status": False}), 400
   # 生成scan_id
   create_scan_id = str(int(time.time() * 1000)) + '_id'
   # 基准测试
   if 'params' in request.json.keys():
       data_type = 'params'
       re_data = request.json['params']
       res = requests.request(method = request.json['method'], url = request.json['url'],
                              headers = request.json['headers'],
                              params = request.json['params'])
   elif 'json' in request.json.keys():
       data_type = 'json'
       re_data = request.json['json']
       res = requests.request(method = request.json['method'], url = request.json['url'],
                              headers = request.json['headers'],
                              json = request.json['json'])
   else:
       data_type = None
       re_data = None
       res = requests.request(method = request.json['method'], url = request.json['url'],
                              headers = request.json['headers'])
   if res.status_code != 200:
       return jsonify({"scan_id": create_scan_id, "scan_status": False}), 403
   # 执行扫描策略
   def scan(scan_id, method, url, headers, data, data_type):
       all_scan_res = []
       if strategy_config['sqlInjection']:
           # sql注入扫描
           all_scan_res.append(sql_injection_scan(method = method, url = url,
                                                  headers = headers,
                                                  data = data, data_type = data_type))
       # xss扫描
       if strategy_config['xssCheck']:
           all_scan_res.append(xss_check_scan(method = method, url = url,
                                              headers = headers,
                                              data = data, data_type = data_type))
       # 扫描完成后
       redis_conn.set(f'scan_id:{scan_id}', json.dumps(all_scan_res), ex = 3600 * 24)
   t = Thread(target = scan, args = (
       create_scan_id, request.json['method'], request.json['url'], request.json['headers'], re_data, data_type))
   t.start()
   return jsonify({"scan_id": create_scan_id, "scan_status": True}), 200

获取扫描结果接口

@app.route('/scan_result', methods = ['GET'])
def scan_result():
   scan_id = request.args['scan_id']
   res = redis_conn.get(f'scan_id:{scan_id}')
   if res is not None:
       res = json.loads(res.decode('utf-8'))
   else:
       return jsonify({"result": False, "status": "no scanning", "fail_data": res}), 200
   if res is []:
       result = True
   else:
       result = False
   return jsonify({"result": result, "status": "scan success", "fail_data": res}), 200

sql注入漏洞的扫描

将接口各入参进行sql注入字典集的遍历,如果存在sql注入漏洞,接口返回体可能会包含mysql的报错信息、返回多条数据。

def sql_injection_scan(method, url, headers=None, data=None, data_type=None):
   if data is None:
       return []
   with open(file=DIR + '\\scan_business\\sqlInjectDb.txt', mode='r', encoding='utf-8') as f:
       attack_f = f.readlines()
   with open(file=DIR + '\\scan_business\\api_checkDB.txt', mode='r', encoding='utf-8') as f2:
       check_list = f2.readlines()
   data = deepcopy(data)
   fail_data = []
   # 遍历每一个字段
   for k in data.keys():
       # 遍历每一个攻击语句
       for attack in attack_f:
           data[k] = attack
           # 接口参数篡改好后,发起请求
           if data_type == 'params':
               res = requests.request(method=method, url=url, params=data, headers=headers)
           else:
               res = requests.request(method=method, url=url, json=data, headers=headers)
           # 接口结果分析
           for i in check_list:
               if i in res.text:
                   fail_data.append(
                       {"attack": attack, "scan_key": k, "status_code": res.status_code, "res": res.text})
   return fail_data
0

评论区