requests是一个简洁易用的http-client库,早期在github的python项目受欢迎程度可以排名TOP10。介绍这个项目,我个人觉得还是官方的地道: Requests is an elegant and simple HTTP library for Python, built for human beings. 夸张到是人类就会使用requests)。我们一起阅读一下其源码,学习它是如何实现的。整篇文档分下面几个部分:

  • 项目结构
  • api 模块
  • sessions 模块
  • models 模块
  • adapters 模块
  • 小技巧

1、项目结构

本次阅读代码版本是 2.24.0, 从github上clone项目后,使用log命令查看历史信息,找到tag=2.24.0的标签,切换版本:

git checkout 0797c61fd541f92f66e409dbf9515ca287af28d2

可以使用下面的方法简单判断一下代码量,这样阅读完成后会更有成就感。

requests git:(0797c61f) ✗ find requests -name "*.py" |xargs cat|grep -v ^$|wc -l  # 4000

大概浏览一下项目结构和代码,我们可以知道每个模块的功能: 名称|描述 —|— adapters.py|负责http连接的处理,主要适配自urllib3库 api|api接口 auth|http认证 certs|https证书处理 compat|python版本适配包 cookies|cookie处理 help|帮助 hook|钩子系统 models|数据模型 packages|兼容包相关 sessions|session处理 status_codes|http状态码 structures|数据结构 utils|工具

4000多行代码,10多个模块,要全部梳理工作量不小,难度也大。本篇文章我们还是只关注主线,对于支线和细枝末节可以 不求甚解

2、api模块

首先还是从requests的使用示例出发:

>>> r = requests.get('https://api.github.com/user', auth=('user', 'pass'))
>>> r.status_code
200
>>> r.headers['content-type']
'application/json; charset=utf8'
>>> r.encoding
'utf-8'
>>> r.text
'{"type":"User"...'
>>> r.json()
{'private_gists': 419, 'total_private_repos': 77, ...}

上面的使用方法由api提供:

# api.py

def request(method, url, **kwargs)
    with sessions.Session() as session:
        return session.request(method=method, url=url, **kwargs)

def get(url, params=None, **kwargs):
    kwargs.setdefault('allow_redirects', True)
    return request('get', url, params=params, **kwargs)

这种get-request的api的封装方式,和我们之前读过的redis源码类似,可以让使用者更安全方便。request具体实现代码是从session上下文获取一个session,然后利用 session.request 发送请求。

同时api中还包装了http的 OPTIONS, HEAD, POST, PUT, PATCHDELETE 方法。

3、sessions

sessions.py 对象的创建和上下文:

# sessions.py

class Session(SessionRedirectMixin):
    
    def __init__(self):
        self.headers = default_headers()
        self.cookies = cookiejar_from_dict({})

        # Default connection adapters.
        self.adapters = OrderedDict()
        ...
        self.mount('https://', HTTPAdapter())
    
    def mount(self, prefix, adapter):
        self.adapters[prefix] = adapter
        
    def __enter__(self):
        return self

    def __exit__(self, *args):
        for v in self.adapters.values():
            v.close()

session初始化时候,会创建默认的http-header,http-cookie信息,建立HTTPAdpater对象。__enter____exit__,是上下文装饰器函数,可以用来确保进行adapter的close。

使用request方法发送请求:

def request(self, method, url,
        params=None, data=None, headers=None, cookies=None, files=None,
        auth=None, timeout=None, allow_redirects=True, proxies=None,
        hooks=None, stream=None, verify=None, cert=None, json=None):
    req = Request(
        method=method.upper(),
        url=url,
        headers=headers,
        files=files,
        data=data or {},
        json=json,
        params=params or {},
        auth=auth,
        cookies=cookies,
        hooks=hooks,
    )
    ...
    prep = PreparedRequest()
    prep.prepare(
        method=request.method.upper(),
        url=request.url,
        files=request.files,
        data=request.data,
        json=request.json,
        headers=merge_setting(request.headers, self.headers, dict_class=CaseInsensitiveDict),
        params=merge_setting(request.params, self.params),
        auth=merge_setting(auth, self.auth),
        cookies=merged_cookies,
        hooks=merge_hooks(request.hooks, self.hooks),
    )
    ...
    adapter = self.get_adapter(url=request.url)
    ...
    resp = adapter.send(prep, **send_kwargs)
    return resp

request函数的处理流程,主要分成四步:

  1. 使用请求参数封装Request对象
  2. 生成PreparedRequest对象,并对request对象进行预先处理
  3. 获取对应的http/https协议适配器,并用其send方法发送请求
  4. 将获取的Response对象返回

4、models

在进行请求过程中创建了Request,PreparedRequest对象,同时从adpater中返回了Response对象,这3个对象的具体实现都在 models.py 模块。

class Request(RequestHooksMixin):
    
    def __init__(self,
            method=None, url=None, headers=None, files=None, data=None,
            params=None, auth=None, cookies=None, hooks=None, json=None):
        
        ...
        self.hooks = default_hooks()
        for (k, v) in list(hooks.items()):
            self.register_hook(event=k, hook=v)

        self.method = method
        self.url = url
        self.headers = headers
        self.files = files
        self.data = data
        self.json = json
        self.params = params
        self.auth = auth
        self.cookies = cookies
        ...

Request对象创建比较简单,就是做了一些属性的赋值,然后对外部注入的hook进行了一下校验,确保是可以执行的函数和函数集合。

def register_hook(self, event, hook):
    """Properly register a hook."""

    if event not in self.hooks:
        raise ValueError('Unsupported event specified, with event name "%s"' % (event))

    if isinstance(hook, Callable):  ## hook 是一个函数
        self.hooks[event].append(hook)
    elif hasattr(hook, '__iter__'):  # hook 也可以是一个迭代器
        self.hooks[event].extend(h for h in hook if isinstance(h, Callable))

PreparedRequest对象则对外部的参数进行更多的验证和准备:

class PreparedRequest(RequestEncodingMixin, RequestHooksMixin):
    
    ...
    
    def prepare(self,
        method=None, url=None, headers=None, files=None, data=None,
        params=None, auth=None, cookies=None, hooks=None, json=None):
    """Prepares the entire request with the given parameters."""

        self.prepare_method(method)
        self.prepare_url(url, params)
        self.prepare_headers(headers)
        self.prepare_cookies(cookies)
        self.prepare_body(data, files, json)
        self.prepare_auth(auth, url)

        ...
        hooks = hooks or []
        for event in hooks:
            self.register_hook(event, hooks[event])

可以看到PreparedRequest对象经过了:

  • 准备http方法
  • 准备url
  • 准备header
  • 准备cookie
  • 准备http-body
  • 准备认证
  • 接受Request对象上带来的hook

hook我们最后再进行详细介绍,这里以prepare_headers为例看看验证过程中都做了什么:

def prepare_headers(self, headers):
    """Prepares the given HTTP headers."""

    self.headers = CaseInsensitiveDict()  # 创建字典 
    if headers:
        for header in headers.items():
            # Raise exception on invalid header value.
            check_header_validity(header) # 验证信息
            name, value = header
            self.headers[to_native_string(name)] = value  # 赋值

Response对象,主要模拟文件操作,raw保留了二进制数据流,content属性是获得所有二进制数据,text属性将二进制数据编码成文本,json方法则是将文本序列化方法。

CONTENT_CHUNK_SIZE = 10 * 1024 # 10k数据

class Response(object):
    
    def __init__(self):
        #: File-like object representation of response (for advanced usage).
        #: Use of ``raw`` requires that ``stream=True`` be set on the request.
        #: This requirement does not apply for use internally to Requests.
        self.raw = None
    
    @property
    def content(self):
        """Content of the response, in bytes."""
        ...
        self._content = b''.join(self.iter_content(CONTENT_CHUNK_SIZE)) or b''
        ...
        return self._content
    
    @property
    def text(self):
        content = str(self.content, encoding, errors='replace')
        return content
    
    def json(self, **kwargs):
        ...
        return complexjson.loads(self.text, **kwargs) 

requests 优先使用simplejson进行json的序列化

iter_content 函数中使用一个生成器来迭代的从流中获取数据。至于流如何得到,稍后看adapter的实现。

def iter_content(self, chunk_size=1, decode_unicode=False):
    def generate():
            # Special case for urllib3.
            if hasattr(self.raw, 'stream'):
                try:
                    for chunk in self.raw.stream(chunk_size, decode_content=True):
                        yield chunk
    stream_chunks = generate()
    return stream_chunks

5、adapters 模块

具体的http请求如何发送的呢?主要就在HTTPAdapter中了:

class HTTPAdapter(BaseAdapter):
    def __init__(self, pool_connections=DEFAULT_POOLSIZE,
                 pool_maxsize=DEFAULT_POOLSIZE, max_retries=DEFAULT_RETRIES,
                 pool_block=DEFAULT_POOLBLOCK):
        ...
        # 初始化连接池
        self.poolmanager = PoolManager(num_pools=connections, maxsize=maxsize,
                                       block=block, strict=True, **pool_kwargs)
    
    def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
        conn = self.poolmanager.connection_from_url(url) # 获取连接
        
        url = self.request_url(request, proxies)
        self.add_headers(request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies)
        
        # 发送请求
        resp = conn.urlopen(
                    method=request.method,
                    url=url,
                    body=request.body,
                    headers=request.headers,
                    redirect=False,
                    assert_same_host=False,
                    preload_content=False,
                    decode_content=False,
                    retries=self.max_retries,
                    timeout=timeout
                )
        
        return self.build_response(request, resp)
    
    def close(self):
        self.poolmanager.clear()  # 连接池关闭

这里主要用了urllib3库提供的PoolManager和urlopen,本篇文章我们就不深入里面的实现了,重点看看如何生成Response对象:

def build_response(self, req, resp):
    response = Response()
    
    # Fallback to None if there's no status_code, for whatever reason.
    response.status_code = getattr(resp, 'status', None)

    # Make headers case-insensitive.
    response.headers = CaseInsensitiveDict(getattr(resp, 'headers', {}))

    # Set encoding.
    response.encoding = get_encoding_from_headers(response.headers)
    response.raw = resp  # 二进制流
    response.reason = response.raw.reason

    if isinstance(req.url, bytes):
        response.url = req.url.decode('utf-8')
    else:
        response.url = req.url

    # Add new cookies from the server.
    extract_cookies_to_jar(response.cookies, req, resp)

    # Give the Response some context.
    response.request = req
    response.connection = self

    return response
  • resp 是urllib3的HTTPResponse实现
  • cookie是合并了Request和Response
  • Response还引用了PreparedRequest对象,可以让response的使用更方便

使用requests进行http请求的过程,主要集中在上面四个模块,现在对其核心过程都有了一定的了解。https则是再http基础上,做了更多的验证等工作。可以简单回顾一下请求执行流程:

  1. api中封装易用的API
  2. Session中进行流程的处理
  3. Request和PreparedRequest对请求进行预处理
  4. Response对响应进行封装,提供更易用的方法(json)和数据(ok)

6、小技巧

requests库中还有一些代码,也让使用更简单,可以借鉴。

6.1 json缩进输出

json输出的时候定义indent参数可以进行缩进,sort_keys可以进行排序。

# help.py

"""Pretty-print the bug information as JSON."""
print(json.dumps(info(), sort_keys=True, indent=2))

下面是示例和展示:

a = {
        "name": "game404",
        "age": 2
    }
print(json.dumps(a)) 
print(json.dumps(a, sort_keys=True, indent=2))  # 定义indent参数
# 输出
{"name": "game404", "age": 2}
{
  "age": 2,
  "name": "game404"
}

6.2 structures

structures模块中定义了2个数据结构。普通的python字典不可以使用 . 取值, 如果需要使用 . 需要定义对象:

# structures.py

a = {
    "name":"game404"
}
# print(a.name)  # AttributeError
print(a["name"])

# 定义一个数据结构对象
class Person(object):

    def __init__(self, name):
        self.name = name

LookupDict 可以不用定义对象属性又使用. 取值,这在一些配置类上会很方便:

class LookupDict(dict):
    """Dictionary lookup object."""

    def __init__(self, name=None):
        self.name = name
        super(LookupDict, self).__init__()

    def __repr__(self):
        return '<lookup \'%s\'>' % (self.name)

    def __getitem__(self, key):
        # We allow fall-through here, so values default to None
        # 可以使用. 取值的魔法函数
        return self.__dict__.get(key, None)

    def get(self, key, default=None):
        return self.__dict__.get(key, default
        
a = LookupDict(name="game404")
a["motto"] = "Life is short, you need Python"
a.age = 2
print(a["motto"], a.age, a["age"])  # none, 2, 2

CaseInsensitiveDict 定义了大小写不敏感的字典,用来处理http-header:

class CaseInsensitiveDict(MutableMapping):
    
    def __init__(self, data=None, **kwargs):
        self._store = OrderedDict()  # 使用额外的_store存储数据
        if data is None:
            data = {}
        self.update(data, **kwargs)

    def __setitem__(self, key, value):
        # Use the lowercased key for lookups, but store the actual
        # key alongside the value.
        self._store[key.lower()] = (key, value)  # 字典的key都转换为小写 
    
    def __delitem__(self, key):
        del self._store[key.lower()]
    
cid = CaseInsensitiveDict()
cid['Accept'] = 'application/json'
print(cid['aCCEPT'] == 'application/json')  # True

可以看到CaseInsensitiveDict对象的__dict__实际上使用_store包装了一层:

print(cid.__dict__)  # {'_store': OrderedDict([('accept', ('Accept', 'application/json'))])} 
print(cid._store)  # OrderedDict([('accept', ('Accept', 'application/json'))])

6.3 status_codes

status_codes中定义了http状态码的语义化名称,比如 OK200的语义化表达,不懂http的人也可以看到ok状态。

print(requests.codes["ok"], requests.codes.OK, requests.codes.ok, requests.codes.OKAY)  #200 200 200 200
print(requests.codes.CREATED)  # 201
print(requests.codes.found)  # 302

其实现方法主要是:

# statuc_codes.py

codes = LookupDict(name='status_codes')
for code, titles in _codes.items():
        for title in titles:
            setattr(codes, title, code)  # 默认key 
            if not title.startswith(('\\', '/')):
                setattr(codes, title.upper(), code)  # 大写key

6.4 hook

hooks 提供了一个简单的钩子系统,可以对一个事件名称注册多个处理函数(前面的register_hook),然后在合适的时候触发就可以获取对数据进行处理, 数据处理过程类似linux的管道符号 | :

# hooks.py

HOOKS = ['response']


def default_hooks():  # 初始化默认的事件
    return {event: [] for event in HOOKS}

def dispatch_hook(key, hooks, hook_data, **kwargs):
    """Dispatches a hook dictionary on a given piece of data."""
    hooks = hooks or {}
    hooks = hooks.get(key)
    if hooks:
        if hasattr(hooks, '__call__'):  # 判断是函数还是函数集合
            hooks = [hooks]
        for hook in hooks:
            _hook_data = hook(hook_data, **kwargs)  # 注意hook会返回数据,由下一个函数继续处理
            if _hook_data is not None:
                hook_data = _hook_data
    return hook_data

使用方法在:

class Session(SessionRedirectMixin):

    def send(self, request, **kwargs):
        ...
        r = adapter.send(request, **kwargs)
        # Response manipulation hooks
        r = dispatch_hook('response', hooks, r, **kwargs)

session在获取到请求后,触发预先定义的钩子,对response进行进一步的处理。

参考链接

  • https://requests.readthedocs.io/zh_CN/latest/
  • https://urllib3.readthedocs.io/en/latest/
  • https://gist.github.com/kennethreitz42/973705

转载自:https://mp.weixin.qq.com/s/g8CYXC4bsAwnIzU7w4bpVg