http_requests.py
5.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import json
import os
import requests
import pyramid
from threading import Thread
from requests.utils import default_headers
from logging import getLogger
log = getLogger(__name__)
# irul @ 20240529
def join_headers(d):
return ''.join(f' - {k}: {v}' for k, v in d.items()).strip()
def parse_json(text_message: str):
try:
if isinstance(text_message, str):
text_message = json.loads(text_message)
return json.dumps(text_message, default=str)
except:
return
def is_json_rpc(json_rpc_message):
if not isinstance(json_rpc_message, dict):
try:
json_rpc_message = json.loads(json_rpc_message)
except ValueError:
return
if 'jsonrpc' not in json_rpc_message:
# raise ValueError("Missing 'jsonrpc' key")
return
if json_rpc_message['jsonrpc'] not in ['2.0', '2.1']:
# raise ValueError("Invalid JSON-RPC version")
return
return True
def response_hooks(resp, *args, **kwargs):
resp_json = parse_json(resp.text)
resp_body = resp_json or resp.text
max_resp_length = 1_000
if not resp_json and len(resp_body) > max_resp_length:
resp_body = resp_body[:max_resp_length] + " [response text more than %s characters...]" % max_resp_length
resp_body = os.linesep.join([s for s in resp_body.splitlines() if s]) if not resp_json else resp_body
if resp.status_code == 302:
resp_body = "masked(**Moved Temporarily**)"
if is_json_rpc(resp_body):
msg_tpl = "🠈 Recv JSON-RPC Response [%s %s] %s %s"
else:
msg_tpl = "🠈 Recv HTTP Response [%s %s] %s %s"
resp_msg = msg_tpl % (
resp.status_code, resp.reason, resp_body, join_headers(resp.headers))
log.info(resp_msg)
def requests_log(prep):
req = prep
req_body = req.body if req.body else ""
req_body = req_body.decode() if hasattr(req_body, "decode") else req_body
if is_json_rpc(req_body):
msg_tpl = "🠊 Send JSON-RPC Request [%s %s] %s %s"
else:
msg_tpl = "🠊 Send HTTP Request [%s %s] %s %s"
req_msg = msg_tpl % (req.method, req.url, req_body, join_headers(req.headers))
log.info(req_msg)
class MyRequests(requests.Session):
def __init__(self, base_url=""):
requests.Session.__init__(self)
self._base_url = base_url
# override
def request(self, method, url,
params=None, data=None, headers=None, cookies=None, files=None, auth=None, timeout=30,
allow_redirects=True, proxies=None, hooks=None, stream=None, verify=False, cert=None, json=None):
# Response Hooks
resp_hooks = [response_hooks]
# Create the Request
req = requests.Request(
method=method.upper(),
url=self._base_url + url,
headers=headers,
files=files,
data=data or {},
json=json,
params=params or {},
auth=auth,
cookies=cookies,
# hooks=hooks,
hooks={'response': resp_hooks}
)
prep = self.prepare_request(req)
# Request Hooks
requests_log(prep)
proxies = proxies or {}
settings = self.merge_environment_settings(
prep.url, proxies, stream, verify, cert
)
send_kwargs = {
"timeout": timeout,
"allow_redirects": allow_redirects,
}
send_kwargs.update(settings)
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Send the request.
resp = self.send(prep, **send_kwargs)
return resp
if __name__ == '__main__':
url = 'https://httpbin.org'
payload = {
"name": "irul",
"address": "depok",
}
headers = default_headers()
headers.update({"X-USERID": "irul"})
session = MyRequests(url)
resp = session.post("/post", json=payload, headers=headers)
if not resp:
exit(1)
print("RESP:", resp.text)
def background_task(func):
"""Decorator that do threading on a function"""
def wrap(*args, **kwargs):
pyramid_thread_locals = pyramid.threadlocal.manager.get()
kwargs["tlocal"] = pyramid_thread_locals
"""
Jadi, agar threadlocal-nya pyramid terbaca dari dalam Thread yang akan dibuat
maka bisa dikirim sebagai kwargs `tlocal` yang mana menyimpan threadlocal-nya pyramid.
Namun pastikan pada fungsi yang akan dijalankan oleh Thread, ditambahkan kwargs parameter
jika belum ada. Kemudian selain itu, di dalam fungsi yang akan dipanggil tersebut juga perlu
ditambahkan script berikut:
`pyramid.threadlocal.manager.push(kwargs["tlocal"])`
fyi: https://johncylee.github.io/2012/08/30/multi-thread-testing-in-pyramid/
"""
# Thread(target=func, args=args, kwargs=kwargs).start()
t = Thread(target=func, args=args, kwargs=kwargs)
# t.name = "t." + func.__name__
t.daemon = True # automatically terminated if the main app stop/exit
t.start()
return wrap