Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python学习笔记: hmac算法 #13

Open
AlexZ33 opened this issue Apr 1, 2020 · 4 comments
Open

Python学习笔记: hmac算法 #13

AlexZ33 opened this issue Apr 1, 2020 · 4 comments

Comments

@AlexZ33
Copy link
Member

AlexZ33 commented Apr 1, 2020

https://www.liaoxuefeng.com/wiki/1016959663602400/1183198304823296

通过哈希算法,我们可以验证一段数据是否有效,方法就是对比该数据的哈希值,例如,判断用户口令是否正确,我们用保存在数据库中的password_md5对比计算md5(password)的结果,如果一致,用户输入的口令就是正确的。

为了防止黑客通过彩虹表根据哈希值反推原始口令,在计算哈希的时候,不能仅针对原始输入计算,需要增加一个salt来使得相同的输入也能得到不同的哈希,这样,大大增加了黑客破解的难度。

如果salt是我们自己随机生成的,通常我们计算MD5时采用md5(message + salt)。但实际上,把salt看做一个“口令”,加salt的哈希就是:计算一段message的哈希时,根据不通口令计算出不同的哈希。要验证哈希值,必须同时提供正确的口令。

这实际上就是Hmac算法:Keyed-Hashing for Message Authentication。它通过一个标准算法,在计算哈希的过程中,把key混入计算过程中。

和我们自定义的加salt算法不同,Hmac算法针对所有哈希算法都通用,无论是MD5还是SHA-1。采用Hmac替代我们自己的salt算法,可以使程序算法更标准化,也更安全。

Python自带的hmac模块实现了标准的Hmac算法。我们来看看如何使用hmac实现带key的哈希。

我们首先需要准备待计算的原始消息message,随机key,哈希算法,这里采用MD5,使用hmac的代码如下:

>>> import hmac
>>> message = b'Hello, world!'
>>> key = b'secret'
>>> h = hmac.new(key, message, digestmod='MD5')
>>> # 如果消息很长,可以多次调用h.update(msg)
>>> h.hexdigest()
'fa4ee7d173f2d97ee79022d1a7355bcf'

可见使用hmac和普通hash算法非常类似。hmac输出的长度和原始哈希算法的长度一致。需要注意传入的key和message都是bytes类型,str类型需要首先编码为bytes。

@AlexZ33
Copy link
Member Author

AlexZ33 commented Apr 1, 2020

auth.py

import requests
import time
import datetime
import hmac
import hashlib
import base64
import json
from urllib.parse import urlparse


class Client():

    _STRING_TO_SIGN = "{method}\n{host}\n{uri}\n{timestamp}\n"
    _AUTH_HEADER = "APIKey={api_key},Signature={sig},Timestamp={timestamp}"

    def __init__(self, AK, SK):
        self.ak = AK
        self.sk = SK

    def _wrapper_header(self, url, headers):
        # get time
        TIMESTAMP = datetime.datetime.utcnow().isoformat() + "+00:00"

        parsed_url = urlparse(url)
        host = parsed_url.netloc
        uri = parsed_url.path
        # print("netloc:", parsed_url.netloc)
        string_to_sign = self._STRING_TO_SIGN.format(
            method="GET",
            host=parsed_url.netloc,
            uri=uri,
            timestamp=TIMESTAMP
        )

        headers_default = {
            "User-Agent": "default",
            "Content-Type": "application/json;charset=UTF-8"
        }

        for header in headers_default:
            if header in headers:
                headers_default[header] = headers[header]

        for key in sorted(headers_default):
            string_to_sign += headers_default[key] + "\n"

        # print(string_to_sign)
        key = bytes(self.sk, 'UTF-8')
        raw_sig = hmac.new(key, string_to_sign.encode("utf-8"), hashlib.sha256).digest()
        # raw_sig = bytes(raw_sig, 'UTF-8')
        encoded_sig = base64.urlsafe_b64encode(raw_sig)

        encoded_sig = str(encoded_sig, 'UTF-8')
        # print(encoded_sig)
        headers_default['Authorization'] = self._AUTH_HEADER.format(
            api_key = self.ak,
            sig = encoded_sig,
            timestamp = TIMESTAMP
        )

        return headers_default

    def request(self, method, url, headers={}, body=""):
        action_d = {
            "get": requests.get,
            "post": requests.post,
            "put": requests.put,
            "delete": requests.delete,
            "patch": requests.patch,
        }
        headers = self._wrapper_header(url, headers)
        if method == "get":
            resp = action_d["get"](url, headers=headers)
        else:
            if method not in action_d:
                return "This method is not support."
            resp = action_d["get"](url, headers=headers, json=body)
        return resp

@AlexZ33
Copy link
Member Author

AlexZ33 commented Apr 1, 2020

test.py

from request_auth import Client
import requests


AK = ''
SK = ''


def main():
    client = Client(AK, SK)
    url = "url"
    headers = {
        "jx": "jx"
    }
    # request(self, method, url, headers={}, body="")
    resp = client.request('get', url, headers)
    print(resp.text)


if __name__ == '__main__':
    main()

@AlexZ33
Copy link
Member Author

AlexZ33 commented Apr 1, 2020

auth.go

package auth

import (
     "fmt"
    "net/http"
    "io/ioutil"
    "strings"
    "bytes"
    "sort"
    "crypto/hmac"
    "crypto/sha256"
    "encoding/base64"
    "time"
)

const (
	// common parameters
       // 公用参数
	authorizationHeader = "Authorization"
	apiKeyParam         = "APIKey"
	signatureParam      = "Signature"
	timestampParam      = "Timestamp"

	// parsing bits 需要转换为bytes类型的参数
	empty   = ""
	comma   = ","
	space   = " "
	eqSign  = "="
	newline = "\n"
)

type Wrapper struct {
    Ak string
    Sk string
}

func signString(str, ak, sk, timestamp string) string {
	hash := hmac.New(sha256.New, []byte(sk))
	hash.Write([]byte(str))
	signature := base64.URLEncoding.EncodeToString(hash.Sum(nil))
    return "APIKey=" + ak + ",Signature=" + signature + ",Timestamp=" + timestamp
}

func stringToSign(request *http.Request, timestamp string) string {
	var buffer bytes.Buffer

    uriList := strings.Split(request.URL.RequestURI(), "?")
    uriNoArgs := uriList[0]
	// Standard
	buffer.WriteString(request.Method)
	buffer.WriteString(newline)
	buffer.WriteString(request.Host)
	buffer.WriteString(newline)
	buffer.WriteString(uriNoArgs)
    // fmt.Println(request.URL.RequestURI())
	buffer.WriteString(newline)
	buffer.WriteString(timestamp)
	buffer.WriteString(newline)

    // new Headers
    headerOrig := map[string]string{
        "User-Agent": "default",
        "Content-Type": "application/json;charset=UTF-8",
    }
    for k, v := range headerOrig {
        v1 := request.Header.Get(k)
        if v1 == "" {
            headerOrig[k] = v
        }
    }
    keys := make([]string, 0, len(headerOrig))
	for k, v := range headerOrig {
        request.Header.Set(k, v)
		keys = append(keys, k)
	}
    sort.Strings(keys)

    for _, k := range keys {
        header := headerOrig[k]
        buffer.WriteString(header)
        buffer.WriteString(newline)
    }

	return buffer.String()
}

func (w Wrapper) WrapRequest(request *http.Request) {
    currentTime := time.Now().UTC().String()
    timeArray := strings.Split(currentTime, space)
    timeHour := timeArray[1][0 : len(timeArray[1])-3]
    timestamp := timeArray[0] + "T" + timeHour + "+00:00"

    str := stringToSign(request, timestamp)
    authHeader := signString(str, w.Ak, w.Sk, timestamp)
    request.Header.Set("Authorization", authHeader)
}

@AlexZ33
Copy link
Member Author

AlexZ33 commented Apr 1, 2020

main.go

package main

import (
    " auth"  // 上面代码github地址
    "net/http"
    "io/ioutil"
    "strings"
    "fmt"
)

const (
    ak = ""
    sk = ""
)

func main() {
    client := &http.Client{}

    url := "url"
    method := "GET"
    data := ""
    body := strings.NewReader(data)
    request, _ := http.NewRequest(method, url, body)

    wrapper := wrap.Wrapper{
        Ak: ak,
        Sk: sk,
    }
    wrapper.WrapRequest(request)

    response, _ := client.Do(request)
    defer response.Body.Close()
    respBody, _ := ioutil.ReadAll(response.Body)
    fmt.Printf("response data:%v\n",string(respBody))
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant