diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..56a6cef --- /dev/null +++ b/.gitattributes @@ -0,0 +1,11 @@ +# Exclude non-essential files from dist +/.github export-ignore +/bin export-ignore +/doc export-ignore +/examples export-ignore +/res export-ignore +/tests export-ignore +.php_cs.dist export-ignore +phpstan.neon export-ignore +phpunit.xml.dist export-ignore +swoole-phpunit export-ignore \ No newline at end of file diff --git a/.github/docker-compose.yml b/.github/docker-compose.yml index ae16083..e630f21 100644 --- a/.github/docker-compose.yml +++ b/.github/docker-compose.yml @@ -13,19 +13,20 @@ services: - zookeeper environment: KAFKA_BROKER_ID: 1 - KAFKA_ADVERTISED_HOST_NAME: kafka1 - KAFKA_ADVERTISED_PORT: 9092 - KAFKA_HOST_NAME: kafka1 - KAFKA_PORT: 9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 HOSTNAME_COMMAND: hostname -i - KAFKA_LISTENERS: PLAINTEXT://kafka1:9092,SASL_PLAINTEXT://kafka1:9093 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,SASL_PLAINTEXT://kafka1:9093 + KAFKA_LISTENERS: PLAINTEXT://kafka1:9092,SASL_PLAINTEXT://kafka1:9093,SASL_SSL://kafka1:9094,SSL://kafka1:9095 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,SASL_PLAINTEXT://kafka1:9093,SASL_SSL://kafka1:9094,SSL://kafka1:9095 KAFKA_NUM_PARTITIONS: 3 KAFKA_SASL_ENABLED_MECHANISMS: PLAIN KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN - # KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT + KAFKA_SSL_KEYSTORE_LOCATION: "/kafka-client/.github/kafka/cert/kafka1/server.keystore.jks" + KAFKA_SSL_KEYSTORE_PASSWORD: phpkafka123456 + KAFKA_SSL_KEY_PASSWORD: phpkafka123456 + KAFKA_SSL_TRUSTSTORE_LOCATION: "/kafka-client/.github/kafka/cert/server.truststore.jks" + KAFKA_SSL_TRUSTSTORE_PASSWORD: phpkafka123456 +# KAFKA_SSL_CLIENT_AUTH: "required" KAFKA_OPTS: "-Djava.security.auth.login.config=/kafka-client/.github/kafka/jaas.conf" command: "/kafka-client/.github/kafka/start_kafka.sh" volumes: @@ -33,6 +34,8 @@ services: ports: - "9092:9092" - "9093:9093" + - "9094:9094" + - "9095:9095" kafka2: container_name: kafka2 @@ -41,26 +44,29 @@ services: - zookeeper environment: KAFKA_BROKER_ID: 2 - KAFKA_ADVERTISED_HOST_NAME: kafka2 - KAFKA_ADVERTISED_PORT: 9093 - KAFKA_HOST_NAME: kafka2 - KAFKA_PORT: 9093 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 HOSTNAME_COMMAND: hostname -i - KAFKA_LISTENERS: PLAINTEXT://kafka2:9094,SASL_PLAINTEXT://kafka1:9095 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9094,SASL_PLAINTEXT://kafka1:9095 + KAFKA_LISTENERS: PLAINTEXT://kafka2:19092,SASL_PLAINTEXT://kafka2:19093,SASL_SSL://kafka2:19094,SSL://kafka2:19095 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:19092,SASL_PLAINTEXT://kafka2:19093,SASL_SSL://kafka2:19094,SSL://kafka2:19095 KAFKA_NUM_PARTITIONS: 3 KAFKA_SASL_ENABLED_MECHANISMS: PLAIN KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT - # KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT + KAFKA_SSL_KEYSTORE_LOCATION: "/kafka-client/.github/kafka/cert/kafka2/server.keystore.jks" + KAFKA_SSL_KEYSTORE_PASSWORD: phpkafka123456 + KAFKA_SSL_KEY_PASSWORD: phpkafka123456 + KAFKA_SSL_TRUSTSTORE_LOCATION: "/kafka-client/.github/kafka/cert/server.truststore.jks" + KAFKA_SSL_TRUSTSTORE_PASSWORD: phpkafka123456 +# KAFKA_SSL_CLIENT_AUTH: "required" KAFKA_OPTS: "-Djava.security.auth.login.config=/kafka-client/.github/kafka/jaas.conf" command: "/kafka-client/.github/kafka/start_kafka.sh" volumes: - "${GITHUB_WORKSPACE}:/kafka-client:rw" ports: - - "9094:9094" - - "9095:9095" + - "19092:19092" + - "19093:19093" + - "19094:19094" + - "19095:19095" swoole: container_name: "swoole" diff --git a/.github/kafka/cert.sh b/.github/kafka/cert.sh new file mode 100755 index 0000000..d3eee7a --- /dev/null +++ b/.github/kafka/cert.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +BASE_DIR="/kafka-client/.github/kafka/cert" +PASSWORD='phpkafka123456' + + +mkdir -p ${BASE_DIR} +mkdir -p ${BASE_DIR}/kafka1 +mkdir -p ${BASE_DIR}/kafka2 + +cd ${BASE_DIR} || exit 1 + + +# 生成CA证书 +openssl req -new -x509 -keyout ca-key -out ca-cert -days 36500 -subj "/CN=Kakfa ROOT CA/OU=PHPKafka/O=Swoole/L=BeiJing/ST=BeiJing/C=CN" -passin "pass:${PASSWORD}" -passout "pass:${PASSWORD}" +keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert -keypass ${PASSWORD} +keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert -keypass ${PASSWORD} + + +# 生成服务端证书 +#kafka1 +keytool -keystore ./kafka1/server.keystore.jks -alias kafka -validity 36500 -genkey -keypass phpkafka123456 -keyalg RSA -dname "CN=kafka1,OU=PHPKafka,O=Swoole,L=BeiJing,S=BeiJing,C=CN" -storepass phpkafka123456 +keytool -keystore ./kafka1/server.keystore.jks -alias kafka -certreq -file ./kafka1/cert-file -keypass ${PASSWORD} +openssl x509 -req -CA ca-cert -CAkey ca-key -in ./kafka1/cert-file -out ./kafka1/cert-signed -days 36500 -CAcreateserial -passin pass:${PASSWORD} +keytool -keystore ./kafka1/server.keystore.jks -alias CARoot -import -file ca-cert -keypass ${PASSWORD} +keytool -keystore ./kafka1/server.keystore.jks -alias kafka -import -file ./kafka1/cert-signed -keypass ${PASSWORD} + +#kafka2 +keytool -keystore ./kafka2/server.keystore.jks -alias kafka -validity 36500 -genkey -keypass ${PASSWORD} -keyalg RSA -dname "CN=kafka2,OU=PHPKafka,O=Swoole,L=BeiJing,S=BeiJing,C=CN" -storepass phpkafka123456 +keytool -keystore ./kafka2/server.keystore.jks -alias kafka -certreq -file ./kafka2/cert-file -keypass ${PASSWORD} +openssl x509 -req -CA ca-cert -CAkey ca-key -in ./kafka2/cert-file -out ./kafka2/cert-signed -days 36500 -CAcreateserial -passin pass:${PASSWORD} +keytool -keystore ./kafka2/server.keystore.jks -alias CARoot -import -file ca-cert -keypass ${PASSWORD} +keytool -keystore ./kafka2/server.keystore.jks -alias kafka -import -file ./kafka2/cert-signed -keypass ${PASSWORD} + diff --git a/.github/kafka/cert/ca-cert b/.github/kafka/cert/ca-cert new file mode 100644 index 0000000..850ef2c --- /dev/null +++ b/.github/kafka/cert/ca-cert @@ -0,0 +1,23 @@ +-----BEGIN CERTIFICATE----- +MIIDvTCCAqWgAwIBAgIUKLNl/S7X9vU4pwUxAuswRdo3ydQwDQYJKoZIhvcNAQEL +BQAwbTEWMBQGA1UEAwwNS2FrZmEgUk9PVCBDQTERMA8GA1UECwwIUEhQS2Fma2Ex +DzANBgNVBAoMBlN3b29sZTEQMA4GA1UEBwwHQmVpSmluZzEQMA4GA1UECAwHQmVp +SmluZzELMAkGA1UEBhMCQ04wIBcNMjEwODIwMTYxMzA3WhgPMjEyMTA3MjcxNjEz +MDdaMG0xFjAUBgNVBAMMDUtha2ZhIFJPT1QgQ0ExETAPBgNVBAsMCFBIUEthZmth +MQ8wDQYDVQQKDAZTd29vbGUxEDAOBgNVBAcMB0JlaUppbmcxEDAOBgNVBAgMB0Jl +aUppbmcxCzAJBgNVBAYTAkNOMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKC +AQEAxCd0Ayj2T608ItvRFQx/kWfDRxokqdK9vk5W+RJ5+YrID2bgXTd2eH7Lahqk +9OwxQP4diggI7G4C8pgpAuENoZ3OdZZ2/C8AvYyY3w++CXGXJ4CnrX+4gcWepGaU +G1J3NHH/MKp2o1knirG8JsqT8DhhE6GHg7E/DbqsBvP8gc2egKeqaEnA3F0gTgGL +P26Qvi6d7X6zjkJ8dw0FK+7lqs8d+orcAOdIu/cwTccsUbXkItM/08qxguMw45vq +V1F3XHdxEsukE5zQcqJQ1tT97CplgPHSjzHScJ1FWEc9bJj33QKYHmo9ydcH87od +K8rd5H5havs89xASlw8ZKCLDKwIDAQABo1MwUTAdBgNVHQ4EFgQUwNeB+aYKZ12q +3jOSvhqWvSZRevswHwYDVR0jBBgwFoAUwNeB+aYKZ12q3jOSvhqWvSZRevswDwYD +VR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAfOypnhm94PN7lVgMgL/A +BLTHo2fSnnTzfX3kHazYmNX00Ro+R3A/UB+SjTPfEX4KdvFdWgzliCSqUY0NGQJc +fP26rspDkA2lAumG0ZqU7xdZUraje7eFsHW0MJGhmoajn87DbiwTpKmdSj9Xn3Sp +FMjkoS3Qq2H/++hIl4ryq4lCrkhuBM7iHi0pBa9o6t7TVaAqORizcl5H+5mlpFRJ +Fp4eApyJ2dugey8Ye7WXpPYcBHpdk7mIMltPrtaTyuCz0u2el9kpH5YkhXuJ/BfH +9X4j5QxyjO7gr+J3I2ZGKUHDm+sf0HfB3TPIspCwrc2JTfHE0pTr8o/cWKOB3Z6V +4A== +-----END CERTIFICATE----- diff --git a/.github/kafka/cert/ca-cert.srl b/.github/kafka/cert/ca-cert.srl new file mode 100644 index 0000000..a86a8ab --- /dev/null +++ b/.github/kafka/cert/ca-cert.srl @@ -0,0 +1 @@ +69C1A3EBB2D20686DF3CA8B6E7E99E29C1A93ED3 diff --git a/.github/kafka/cert/ca-key b/.github/kafka/cert/ca-key new file mode 100644 index 0000000..ea3405a --- /dev/null +++ b/.github/kafka/cert/ca-key @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQITdvTpQZP4Q0CAggA +MAwGCCqGSIb3DQIJBQAwFAYIKoZIhvcNAwcECJ6+GA/vtWHUBIIEyFKhNTuLdOWm +mSKqoDPH+VrCXLoFR6ZVHdPjbpeeupeGAswncAz/huvS9lTdTLFeSapbCstJ5O0R +ycXLJI3QEYQJokOtwUUtwjPGdZPuhrDI/mYTXUnwxsqKtD2RlafR+ibY9ARvIYuT +d5QCMthnBws4+f9IDzcwanOFpuOHW2OEgyhz4vpAhzbwjdLFD4p1zZXOnG61asqR +rnot+Nkfdl8mK3Kv/HMih8W+ZHuwQYlBAZ4UpW/X5mV1wOMLL4cul+LatfBRHJV0 +GvuT+crDQvnjwZx91ENrPOg7cLpBiT6OvoduKs+uOTgEGUFsEbxbUvtDwFTfrXVu +McX5nTyDwaq06+ROmkbTRZF360ixnVNlGm3LEmd3Kq2o9EVt680RqMFk5lvW3Cz1 +WoKXfsoXjusR/FnWYNdY6FnfZSFWZxlJy+xe+gmentunehO2HhrU1I2M5cY0jV2G +r4kiMiAJTPblHuFIB+kq8fMh8jt8WtNAAr1cj1Scynz+fd11+tGeNMAkQ5ZaPMzj +ta6YawkVNMhGKGoPPcGNqF/ma1PFxpGNPCVk31gK07ihQixsHe1vayHsxtmTcD/e +si53AmXjbkvyEVYUAA7k3J0BoS0OBbcGwv8IJrpgf8c+qhnllqosbF65W9fTCyKS +goKZQ2zG4Q1nkiTCDf6sIu1WNkQaVgZEWB7XXt454VkEtSReI4FsVfxsC1SdsT1a +cYsZnWKOMuYsHZ9EyW2kCSbrNqPxWRqgNnt+SYIJ5OaOtkS/WU3winhFq7bMrYY4 +M/PAkLLL7C+I72IxApNWvMbsjyh+zfmAsGmNtvqrhcI4pAPC6lSq8z5TtuUztyFb +nNvOIaAv/v977rVLoyJL3XpYGO9GuSEoNNmQxuCQPBAPV35IArH7Yhdi2lk4MZ2g +qKyA7SP5eVi8LRJVT9Sr99FxXZC3QCEkNBMorvjHjcx+vd5V5KEojfv1DACvQ58R +w9ixAG0uhzmyva02ggfXcfV+NRP2sVI6+Ha0uFpMK3tHtLHFqk4YdfaOTXV3JYKJ +4fu5NK4BjE11RoKd9tI7jpCxhlTN5pL9AmrDrcdhbQJEqpyxw2oLJwqtc4jOVv7m +cUMvPBn5bv3a/Pb+EMzCQSzcPvoDRj4TW56LZeCgA/XwC+UM07KHjJLZkdN2LeFS +mrF5hc6GX8wPBw+8m4r1bwv1ZemNMI9UU5o6TEd2C8XVwzrwqeGjWYyR1WmIJnjl +oRiXIv8WCSL3wba7atVTctW7SDphg0U5cnl93DNhQHIdgbGOgoug3nHl9UJcGF46 +1cUC+yXQZcH+M3hMlsPK1lddEFJKtSsZxijmaxiRFGMX+MNCyuaKQrX1DHwK1F1L +2vmdqzBKm4yfxogfZuu3YyW7ElzYMs+wzsPzSKvOlx1SvE6QqO6KrSVN6EBLr/oj +BXh6qOuZhnT4yofxdQu1pJ/GCtQV53/FS2B3NrjfSSQxL6l8lBPUD1sDX6MHIjsH +aZByCbRssVBozdAIMzP6LhJKpp10Loo4u+IT0Jh1yAqVSnIUI05yOWwJKYFY7smw +4XN5IXJxtA8KVIfx4wIJPbibTPk//lvZwEG1gWWYbE4MUkxjtbqdxrykeo+Qxx8K +6PIusaZ6ImhMp562DHKxzg== +-----END ENCRYPTED PRIVATE KEY----- diff --git a/.github/kafka/cert/client.truststore.jks b/.github/kafka/cert/client.truststore.jks new file mode 100644 index 0000000..b0cf947 Binary files /dev/null and b/.github/kafka/cert/client.truststore.jks differ diff --git a/.github/kafka/cert/kafka1/cert-file b/.github/kafka/cert/kafka1/cert-file new file mode 100644 index 0000000..fcac18c --- /dev/null +++ b/.github/kafka/cert/kafka1/cert-file @@ -0,0 +1,18 @@ +-----BEGIN NEW CERTIFICATE REQUEST----- +MIIC2zCCAcMCAQAwZjELMAkGA1UEBhMCQ04xEDAOBgNVBAgTB0JlaUppbmcxEDAO +BgNVBAcTB0JlaUppbmcxDzANBgNVBAoTBlN3b29sZTERMA8GA1UECxMIUEhQS2Fm +a2ExDzANBgNVBAMTBmthZmthMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC +ggEBAKqYF56/qlrZ+dOPFCOlqCvJDSODJ1gF4MJZC8fvrGMwKP5lD/AhCGVKcqcA +N4p9EAV35b+DylRaBECSAWicDLp81nEqmLxGmaNvku8s8CuFY2X13SXxXJDCDahW +N/57SDz7EdCIY85wk1Ad0B5WvGZJnnukqhuAHzRCxThn+PvDTSXvry1CwuScB5Zy +RyaHxOEk1vN58gPxPsKPbtv9p6d/fapyJ/VuQ4sI4ZsTyJxMmFlhAgQQNkBlCcVc +EOdXn8ZhEWPh9Oblr+tcvs5oKZgA788hG9qsPecDi3ONez6qxJUao/bklnY+NpJq +XTWD9zBIBS5qJOhiN5Z2U8WhtIcCAwEAAaAwMC4GCSqGSIb3DQEJDjEhMB8wHQYD +VR0OBBYEFIyGlqi2MA7iOY1rggSGwUAisOHSMA0GCSqGSIb3DQEBCwUAA4IBAQBs +qtZHIX4GycmNOPLhZjCZd1aX8LJIZZHkt4OaHDMisqT5jxXTqk0JBrozTWJ+V9UF +0P0Hkbi5tEJKSrfVFtddX6t9oGOaIkpdj+d3L8vOr5tq/XwoPyVUrgsIW6dw4G8d +SsSlYy4PiirITtcUAOFqn/gQSkscQpBrrpqZb8SBjAys63URMvZWJr+e2TcnWm9G +NOcwWAl12M2rzaCFooMoMqURWcyUEsPS39HkJZT9pdlUeztdOLUVZfrGDh4OQtAK +/wv6XIZyqgucCuyZehqSY956QCv+/B5CQWOc8sCknG5JQMQzfCWHTPQ83YVJGHRt +tI3oTaAXLGjJILuQzA1O +-----END NEW CERTIFICATE REQUEST----- diff --git a/.github/kafka/cert/kafka1/cert-signed b/.github/kafka/cert/kafka1/cert-signed new file mode 100644 index 0000000..35bf333 --- /dev/null +++ b/.github/kafka/cert/kafka1/cert-signed @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDXDCCAkQCFGnBo+uy0gaG3zyotufpninBqT7SMA0GCSqGSIb3DQEBCwUAMG0x +FjAUBgNVBAMMDUtha2ZhIFJPT1QgQ0ExETAPBgNVBAsMCFBIUEthZmthMQ8wDQYD +VQQKDAZTd29vbGUxEDAOBgNVBAcMB0JlaUppbmcxEDAOBgNVBAgMB0JlaUppbmcx +CzAJBgNVBAYTAkNOMCAXDTIxMDgyMDE2MTMyMFoYDzIxMjEwNzI3MTYxMzIwWjBm +MQswCQYDVQQGEwJDTjEQMA4GA1UECBMHQmVpSmluZzEQMA4GA1UEBxMHQmVpSmlu +ZzEPMA0GA1UEChMGU3dvb2xlMREwDwYDVQQLEwhQSFBLYWZrYTEPMA0GA1UEAxMG +a2Fma2ExMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqpgXnr+qWtn5 +048UI6WoK8kNI4MnWAXgwlkLx++sYzAo/mUP8CEIZUpypwA3in0QBXflv4PKVFoE +QJIBaJwMunzWcSqYvEaZo2+S7yzwK4VjZfXdJfFckMINqFY3/ntIPPsR0IhjznCT +UB3QHla8Zkmee6SqG4AfNELFOGf4+8NNJe+vLULC5JwHlnJHJofE4STW83nyA/E+ +wo9u2/2np399qnIn9W5DiwjhmxPInEyYWWECBBA2QGUJxVwQ51efxmERY+H05uWv +61y+zmgpmADvzyEb2qw95wOLc417PqrElRqj9uSWdj42kmpdNYP3MEgFLmok6GI3 +lnZTxaG0hwIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQAzqqqoFtvQSzUIlLfFgMhN +NUAedhbixTUodh9z8OizlCGZw3Z7+Qkr1LkKtv81JWTiCP4f+WWCe7KWgkxwq2wD +yMJS/tKVZqBxVD8NcTQo7NvuXDhSWfGHqx52CcsLxw3KB7Oilo/x1fHqWNCE371J +/Vtz9yYak9Gx/b8vCY9eTb3x8h0+1SdLJtgxMAPPhCExDn7AfNzVfDm0RzmJEFWK +7kksI+N15QmeuooaxYB2yC7WspvsoRNeCOKoejlLUlpNBCWIDqiFnFv4lKayk0EA +pb46/6lM7F6SkglaonKmxh2m6vGaYoy4EVQ5gjZ6t9kolIBQdSnYTIRjweQnJ5cj +-----END CERTIFICATE----- diff --git a/.github/kafka/cert/kafka1/server.keystore.jks b/.github/kafka/cert/kafka1/server.keystore.jks new file mode 100644 index 0000000..98bd91a Binary files /dev/null and b/.github/kafka/cert/kafka1/server.keystore.jks differ diff --git a/.github/kafka/cert/kafka2/cert-file b/.github/kafka/cert/kafka2/cert-file new file mode 100644 index 0000000..1500377 --- /dev/null +++ b/.github/kafka/cert/kafka2/cert-file @@ -0,0 +1,18 @@ +-----BEGIN NEW CERTIFICATE REQUEST----- +MIIC2zCCAcMCAQAwZjELMAkGA1UEBhMCQ04xEDAOBgNVBAgTB0JlaUppbmcxEDAO +BgNVBAcTB0JlaUppbmcxDzANBgNVBAoTBlN3b29sZTERMA8GA1UECxMIUEhQS2Fm +a2ExDzANBgNVBAMTBmthZmthMjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC +ggEBAJ0UD7tLVKdQonnvOQdIMt8XNc2EzvPKfsqaeE6226hyQFmrWTmm0A/VxLO6 +zqYLLAYXezGXsz9D7b3V1RsiZnctX6ZiVXtJ9ZjTrL8u8Si2+FXvbEN7pSMImvIe +DYKtl1wu3RtuYfBN31e0xB8LrxBfipruybj5SoisoX7GFSa1wKXWadh9pAm/RH0R +uOqtmlO+7L8rQQYcRQ5CeUb3bEPZXR43URwKPWqWezyTYXSrsHC7pcAd1L5AsDNw +XKVECpV7PyTmmchpKNIKdJDK27jxix4kUM/hne3AalSvMEkXRBmliOVPE68adQm/ +SBklzsQDZFhuSYPBgYPSyLUAnL8CAwEAAaAwMC4GCSqGSIb3DQEJDjEhMB8wHQYD +VR0OBBYEFIM4VkNA0LqoKBcu4pU1EufrNvbcMA0GCSqGSIb3DQEBCwUAA4IBAQAr +X7E1Sm47F11T35msV34rTfMk9tJUa2Rdpj6WTWuaHSJeFu2fyWXHEYNSwX2qV1Iv +LlYfJ3vkAKtEl5wXe8hmIGP4Gs9KDOWGP6QltRxsLMlW70qFQDP46Uw578+lkB5Z +aFTWCh1A48XL99QERxZAxfXmvq1NQnZ0LD1wuCG7VSFWXiqjIl4oOwDzMsifNJ7N +7EVC/2mE8Gkm7HQr8+mbowlwR9woUhJcYFnxQJgW0mbj3UeDOYI60dJKaVy9SKev +B4wfAX8NwEfqUksllGyQlpKCm+zohEItbXXV/X1bcwZjraSsblntzwjnTEAEaWP1 +aJXCod6x06O3/CWm3KQ7 +-----END NEW CERTIFICATE REQUEST----- diff --git a/.github/kafka/cert/kafka2/cert-signed b/.github/kafka/cert/kafka2/cert-signed new file mode 100644 index 0000000..d910d1c --- /dev/null +++ b/.github/kafka/cert/kafka2/cert-signed @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDXDCCAkQCFGnBo+uy0gaG3zyotufpninBqT7TMA0GCSqGSIb3DQEBCwUAMG0x +FjAUBgNVBAMMDUtha2ZhIFJPT1QgQ0ExETAPBgNVBAsMCFBIUEthZmthMQ8wDQYD +VQQKDAZTd29vbGUxEDAOBgNVBAcMB0JlaUppbmcxEDAOBgNVBAgMB0JlaUppbmcx +CzAJBgNVBAYTAkNOMCAXDTIxMDgyMDE2MTMyNloYDzIxMjEwNzI3MTYxMzI2WjBm +MQswCQYDVQQGEwJDTjEQMA4GA1UECBMHQmVpSmluZzEQMA4GA1UEBxMHQmVpSmlu +ZzEPMA0GA1UEChMGU3dvb2xlMREwDwYDVQQLEwhQSFBLYWZrYTEPMA0GA1UEAxMG +a2Fma2EyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnRQPu0tUp1Ci +ee85B0gy3xc1zYTO88p+ypp4TrbbqHJAWatZOabQD9XEs7rOpgssBhd7MZezP0Pt +vdXVGyJmdy1fpmJVe0n1mNOsvy7xKLb4Ve9sQ3ulIwia8h4Ngq2XXC7dG25h8E3f +V7TEHwuvEF+Kmu7JuPlKiKyhfsYVJrXApdZp2H2kCb9EfRG46q2aU77svytBBhxF +DkJ5RvdsQ9ldHjdRHAo9apZ7PJNhdKuwcLulwB3UvkCwM3BcpUQKlXs/JOaZyGko +0gp0kMrbuPGLHiRQz+Gd7cBqVK8wSRdEGaWI5U8Trxp1Cb9IGSXOxANkWG5Jg8GB +g9LItQCcvwIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQBEVESu7rs89GqugQeLD9M5 +6JqpRKQlQ3eL0+rngj6wPyvFh7Wz5sx3dt1YliH0i+WkFXfl5eh5a1DDg6cdxmy1 +2EAqQLyJbxXjVJQ4NbVaTiY1Bflzs2nMt3gY4haPZmHnTGY7/1PKn11KXH6pC9t/ +oGb7VOnRAPGa3QxSDWgyfFK3gK+EpHRsEuAaaIlAKse0Zu+aZgkp/OQaPK+6sCv/ +DH7KraeNoeeDaHC1sraeZdKR1Gn6ZkBn2l1T8PdMXx9aclfQAWxkF0Ex/SZObN0a +jCO0DzlR0IOQ0eXfdKiDC5t+B0Ifh5Li0nt8nuX3GpbERe50KXqB+XIw5VGIWauG +-----END CERTIFICATE----- diff --git a/.github/kafka/cert/kafka2/server.keystore.jks b/.github/kafka/cert/kafka2/server.keystore.jks new file mode 100644 index 0000000..01a208a Binary files /dev/null and b/.github/kafka/cert/kafka2/server.keystore.jks differ diff --git a/.github/kafka/cert/server.truststore.jks b/.github/kafka/cert/server.truststore.jks new file mode 100644 index 0000000..f3ff342 Binary files /dev/null and b/.github/kafka/cert/server.truststore.jks differ diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 57a057a..3e82ef7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,9 +17,12 @@ jobs: KAFKA_VERSION: ${{ matrix.kafka }} SWOOLE_VERSION: "4.5" KAFKA_HOST: kafka1 - KAFKA_PORT: 9092 - KAFKA_SASL_PORT: 9093 + KAFKA_PLAINTEXT_PORT: 9092 + KAFKA_SASL_PLAINTEXT_PORT: 9093 + KAFKA_SASL_SSL_PORT: 9094 + KAFKA_SSL_PORT: 9095 KAFKA_SASL: '{"type":"longlang\\phpkafka\\Sasl\\PlainSasl","username":"admin","password":"admin-secret"}' + KAFKA_SSL: '{"open":true,"verifyPeer":true,"allowSelfSigned":true,"verifyPeerName":true,"cafile":"/kafka-client/.github/kafka/cert/ca-cert"}' steps: - uses: actions/checkout@v2 @@ -38,14 +41,22 @@ jobs: docker exec swoole composer update docker exec kafka1 /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --partitions 3 --replication-factor 1 --topic test - - name: php-test - run: docker exec -e KAFKA_HOST="$KAFKA_HOST" -e KAFKA_PORT="$KAFKA_PORT" -e KAFKA_VERSION="$KAFKA_VERSION" swoole composer test + - name: plaintext-test + run: | + docker exec -e KAFKA_HOST="$KAFKA_HOST" -e KAFKA_PORT="$KAFKA_PLAINTEXT_PORT" -e KAFKA_VERSION="$KAFKA_VERSION" swoole composer test + docker exec -e KAFKA_HOST="$KAFKA_HOST" -e KAFKA_PORT="$KAFKA_PLAINTEXT_PORT" -e KAFKA_VERSION="$KAFKA_VERSION" swoole composer swoole-test - - name: swoole-test - run: docker exec -e KAFKA_HOST="$KAFKA_HOST" -e KAFKA_PORT="$KAFKA_PORT" -e KAFKA_VERSION="$KAFKA_VERSION" swoole composer swoole-test + - name: sasl-plaintext-test + run: | + docker exec -e KAFKA_HOST="$KAFKA_HOST" -e KAFKA_PORT="$KAFKA_SASL_PLAINTEXT_PORT" -e KAFKA_VERSION="$KAFKA_VERSION" -e KAFKA_SASL="$KAFKA_SASL" swoole composer test + docker exec -e KAFKA_HOST="$KAFKA_HOST" -e KAFKA_PORT="$KAFKA_SASL_PLAINTEXT_PORT" -e KAFKA_VERSION="$KAFKA_VERSION" -e KAFKA_SASL="$KAFKA_SASL" swoole composer swoole-test - - name: php-test-sasl - run: docker exec -e KAFKA_HOST="$KAFKA_HOST" -e KAFKA_PORT="$KAFKA_SASL_PORT" -e KAFKA_VERSION="$KAFKA_VERSION" -e KAFKA_SASL="$KAFKA_SASL" swoole composer test + - name: sasl-ssl-test + run: | + docker exec -e KAFKA_HOST="$KAFKA_HOST" -e KAFKA_PORT="$KAFKA_SASL_SSL_PORT" -e KAFKA_VERSION="$KAFKA_VERSION" -e KAFKA_SASL="$KAFKA_SASL" -e KAFKA_SSL="$KAFKA_SSL" swoole composer test + docker exec -e KAFKA_HOST="$KAFKA_HOST" -e KAFKA_PORT="$KAFKA_SASL_SSL_PORT" -e KAFKA_VERSION="$KAFKA_VERSION" -e KAFKA_SASL="$KAFKA_SASL" -e KAFKA_SSL="$KAFKA_SSL" swoole composer swoole-test - - name: swoole-test-sasl - run: docker exec -e KAFKA_HOST="$KAFKA_HOST" -e KAFKA_PORT="$KAFKA_SASL_PORT" -e KAFKA_VERSION="$KAFKA_VERSION" -e KAFKA_SASL="$KAFKA_SASL" swoole composer swoole-test \ No newline at end of file + - name: ssl-test + run: | + docker exec -e KAFKA_HOST="$KAFKA_HOST" -e KAFKA_PORT="$KAFKA_SSL_PORT" -e KAFKA_VERSION="$KAFKA_VERSION" -e KAFKA_SSL="$KAFKA_SSL" swoole composer test + docker exec -e KAFKA_HOST="$KAFKA_HOST" -e KAFKA_PORT="$KAFKA_SSL_PORT" -e KAFKA_VERSION="$KAFKA_VERSION" -e KAFKA_SSL="$KAFKA_SSL" swoole composer swoole-test diff --git a/README.cn.md b/README.cn.md index e911e64..7e7d1b8 100644 --- a/README.cn.md +++ b/README.cn.md @@ -22,7 +22,7 @@ PHP Kafka 客户端,支持 PHP-FPM、Swoole 环境使用。 - [x] 生产者类 - [x] 消费者类 - [x] SASL 鉴权 -- [ ] SSL 加密通信 +- [x] SSL 加密通信 - [ ] 更多功能的封装及测试用例编写 ## 环境要求 diff --git a/README.md b/README.md index 5e4e592..d354216 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ The communication protocol is based on the JSON file in Java. PHP Kafka client s - [x] Producer - [x] Consumer - [x] SASL -- [ ] SSL +- [x] SSL - [ ] More features and test cases ## Environment diff --git a/doc/consumer.en.md b/doc/consumer.en.md index d3ffe8f..595c16b 100644 --- a/doc/consumer.en.md +++ b/doc/consumer.en.md @@ -41,6 +41,7 @@ Class `longlang\phpkafka\Consumer\ConsumerConfig` | maxBytes | Max bytes | `128 * 1024 * 1024` | | maxWait | The maximum time. (unit: second, decimal) | `1` | | sasl | SASL authentication Info. If the field is null, it will not authenticate with SASL [detail](#SASL-Support) | `[]`| +| ssl | SSL Connect Info. If the field is null, it will not use SSL [detail](#SSL-Support) | `null`| ## Asynchronous (callback) @@ -116,3 +117,43 @@ $consumer = new Consumer($config); // .... Your Business Code ``` + +## SSL Support +Class `longlang\phpkafka\Config\SslConfig` + +> You can pass an array to a constructor. + +### Configuration keys +| Key | Description | Default | +| - | - | - | +| open | Enable SSL | `false` | +| compression | TLS compression. | `true` | +| certFile |Path to local certificate file on filesystem. |`''`| +| keyFile |Path to local private key file on filesystem|`''`| +| passphrase | Passphrase with which your ``certFile`` file was encoded. | `''`| +| peerName | Peer name to be used. If this value is not set, then the name is remote Host | `''`| +| verifyPeer |Require verification of SSL certificate used. | `false` | +| verifyPeerName |Require verification of peer name.| `false` | +| verifyDepth | Abort if the certificate chain is too deep. | `0`| +| allowSelfSigned | Allow self-signed certificates. | `false` | +| cafile | Location of Certificate Authority file on local filesystem which should be used | `''`| +| capath | If cafile is not specified or if the certificate is not found there, the directory pointed to by capath is searched for a suitable certificate. capath must be a correctly hashed certificate directory. | `''`| + +**Example** + +```php +use longlang\phpkafka\Consumer\Consumer; +use longlang\phpkafka\Consumer\ConsumerConfig; +use longlang\phpkafka\Config\SslConfig; + +$config = new ConsumerConfig(); +// .... Your Othor Config +$sslConfig = new SslConfig(); +$sslConfig->setOpen(true); +$sslConfig->setVerifyPeer(true); +$sslConfig->setAllowSelfSigned(true); +$sslConfig->setCafile("/kafka-client/.github/kafka/cert/ca-cert"); +$config->setSsl($sslConfig); +$consumer = new Consumer($config); +// .... Your Business Code +``` \ No newline at end of file diff --git a/doc/consumer.md b/doc/consumer.md index ae015dd..9e1e28b 100644 --- a/doc/consumer.md +++ b/doc/consumer.md @@ -41,6 +41,7 @@ | maxBytes | 最大字节数 | `128 * 1024 * 1024` | | maxWait | 最大等待时间,单位:秒 | `1` | | sasl | SASL身份认证信息。为空则不发送身份认证信息 [详情](#SASL支持) | `[]`| +| ssl | SSL链接相关信息,为空则不使用SSL [详情](#SSL支持) | `null` | ## 异步消费(回调) @@ -114,4 +115,43 @@ $config->setSasl([ ]); $consumer = new Consumer($config); // .... 你的业务代码 +``` + +## SSL支持 +类名:`longlang\phpkafka\Config\SslConfig` + +> 支持构造方法传入数组赋值 +### 配置参数 +|参数名|说明|默认值| +| - | - | - | +| open | 是否开启SSL传输加密 | `false` | +| compression | 是否开启压缩 | `true` | +| certFile |cert证书存放路径|`''`| +| keyFile |私钥存放路径|`''`| +| passphrase | cert证书密码 | `''`| +| peerName| 服务器主机名。默认为链接的host| `''`| +| verifyPeer |是否校验远端证书 | `false` | +| verifyPeerName |是否校验远端服务器名称 | `false` | +| verifyDepth | 如果证书链条层次太深,超过了本选项的设定值,则终止验证。 默认不校验层级 | `0`| +| allowSelfSigned | 是否允许自签证书 | `false` | +| cafile | CA证书路径 | `''`| +| capath | CA证书目录。会自动扫描该路径下所有pem文件 | `''`| + +**代码示例:** + +```php +use longlang\phpkafka\Consumer\Consumer; +use longlang\phpkafka\Consumer\ConsumerConfig; +use longlang\phpkafka\Config\SslConfig; + +$config = new ConsumerConfig(); +// .... 你的其他配置 +$sslConfig = new SslConfig(); +$sslConfig->setOpen(true); +$sslConfig->setVerifyPeer(true); +$sslConfig->setAllowSelfSigned(true); +$sslConfig->setCafile("/kafka-client/.github/kafka/cert/ca-cert"); +$config->setSsl($sslConfig); +$consumer = new Consumer($config); +// .... 你的业务代码 ``` \ No newline at end of file diff --git a/doc/producer.en.md b/doc/producer.en.md index c3aee26..ac82a8c 100644 --- a/doc/producer.en.md +++ b/doc/producer.en.md @@ -30,6 +30,7 @@ Class `longlang\phpkafka\Producer\ProducerConfig` | produceRetry | Produce message retries allowed if matching an error code. | `3` | | produceRetrySleep | Produce message retry sleep time. (unit: second) | `0.1` | | sasl | SASL authentication Info. If the field is null, it will not authenticate with SASL [detail](#SASL-Support) | `[]`| +| ssl | SSL Connect Info. If the field is null, it will not use SSL [detail](#SSL-Support) | `null` | **Default partitioning strategy:** @@ -92,6 +93,7 @@ $producer->sendBatch([ **Example** ```php use longlang\phpkafka\Producer\ProducerConfig; +use longlang\phpkafka\Producer\Producer; $config = new ProducerConfig(); // .... Your Other Config @@ -103,3 +105,44 @@ $config->setSasl([ $producer = new Producer($config); // .... Your Business Code ``` + + +## SSL Support +Class `longlang\phpkafka\Config\SslConfig` + +> You can pass an array to a constructor. + +### Configuration keys +| Key | Description | Default | +| - | - | - | +| open | Enable SSL | `false` | +| compression | TLS compression. | `true` | +| certFile |Path to local certificate file on filesystem. |`''`| +| keyFile |Path to local private key file on filesystem|`''`| +| passphrase | Passphrase with which your ``certFile`` file was encoded. | `''`| +| peerName | Peer name to be used. If this value is not set, then the name is remote Host | `''`| +| verifyPeer |Require verification of SSL certificate used. | `false` | +| verifyPeerName |Require verification of peer name.| `false` | +| verifyDepth | Abort if the certificate chain is too deep. | `0`| +| allowSelfSigned | Allow self-signed certificates. | `false` | +| cafile | Location of Certificate Authority file on local filesystem which should be used | `''`| +| capath | If cafile is not specified or if the certificate is not found there, the directory pointed to by capath is searched for a suitable certificate. capath must be a correctly hashed certificate directory. | `''`| + +**Example** + +```php +use longlang\phpkafka\Producer\ProducerConfig; +use longlang\phpkafka\Producer\Producer; +use longlang\phpkafka\Config\SslConfig; + +$config = new ProducerConfig(); +// .... Your Othor Config +$sslConfig = new SslConfig(); +$sslConfig->setOpen(true); +$sslConfig->setVerifyPeer(true); +$sslConfig->setAllowSelfSigned(true); +$sslConfig->setCafile("/kafka-client/.github/kafka/cert/ca-cert"); +$config->setSsl($sslConfig); +$producer = new Producer($config); +// .... Your Business Code +``` \ No newline at end of file diff --git a/doc/producer.md b/doc/producer.md index f9fed27..6666683 100644 --- a/doc/producer.md +++ b/doc/producer.md @@ -30,6 +30,7 @@ | produceRetry | 生产消息,匹配预设的错误码时,自动重试次数 | `3` | | produceRetrySleep | 生产消息重试延迟,单位:秒 | `0.1` | | sasl | SASL身份认证信息。为空则不发送身份认证信息 [详情](#SASL支持) | `[]`| +| ssl | SSL链接相关信息,为空则不使用SSL [详情](#SSL支持) | `null` | **默认分区策略:** @@ -104,4 +105,45 @@ $config->setSasl([ ]); $producer = new Producer($config); // .... 你的业务代码 +``` + + +## SSL支持 +类名:`longlang\phpkafka\Config\SslConfig` + +> 支持构造方法传入数组赋值 +### 配置参数 + +|参数名|说明|默认值| +| - | - | - | +| open | 是否开启SSL传输加密 | `false` | +| compression | 是否开启压缩 | `true` | +| certFile |cert证书存放路径|`''`| +| keyFile |私钥存放路径|`''`| +| passphrase | cert证书密码 | `''`| +| peerName| 服务器主机名。默认为链接的host| `''`| +| verifyPeer |是否校验远端证书 | `false` | +| verifyPeerName |是否校验远端服务器名称 | `false` | +| verifyDepth | 如果证书链条层次太深,超过了本选项的设定值,则终止验证。 默认不校验层级 | `0`| +| allowSelfSigned | 是否允许自签证书 | `false` | +| cafile | CA证书路径 | `''`| +| capath | CA证书目录。会自动扫描该路径下所有pem文件 | `''`| + +**代码示例:** + +```php +use longlang\phpkafka\Producer\ProducerConfig; +use longlang\phpkafka\Producer\Producer; +use longlang\phpkafka\Config\SslConfig; + +$config = new ProducerConfig(); +// .... 你的其他配置 +$sslConfig = new SslConfig(); +$sslConfig->setOpen(true); +$sslConfig->setVerifyPeer(true); +$sslConfig->setAllowSelfSigned(true); +$sslConfig->setCafile("/kafka-client/.github/kafka/cert/ca-cert"); +$config->setSsl($sslConfig); +$producer = new Producer($config); +// .... 你的业务代码 ``` \ No newline at end of file diff --git a/src/Config/CommonConfig.php b/src/Config/CommonConfig.php index 2708ca3..b50af86 100644 --- a/src/Config/CommonConfig.php +++ b/src/Config/CommonConfig.php @@ -59,6 +59,11 @@ class CommonConfig extends AbstractConfig */ protected $sasl = []; + /** + * @var SslConfig|null + */ + protected $ssl = null; + public function getConnectTimeout(): float { return $this->connectTimeout; @@ -188,4 +193,29 @@ public function setSasl(array $sasl): self return $this; } + + public function getSsl(): SslConfig + { + if (null == $this->ssl) { + return new SslConfig([]); + } + + return $this->ssl; + } + + /** + * @param SslConfig|array $ssl + */ + public function setSsl($ssl): self + { + if (\is_array($ssl)) { + $this->ssl = new SslConfig($ssl); + } elseif ($ssl instanceof SslConfig) { + $this->ssl = $ssl; + } else { + throw new InvalidArgumentException(sprintf('The ssl must be array or SslConfig, and the current type is %s', \gettype($ssl))); + } + + return $this; + } } diff --git a/src/Config/SslConfig.php b/src/Config/SslConfig.php new file mode 100644 index 0000000..8ee07ae --- /dev/null +++ b/src/Config/SslConfig.php @@ -0,0 +1,245 @@ +open; + } + + public function setOpen(bool $open): self + { + $this->open = $open; + + return $this; + } + + public function getCompression(): bool + { + return $this->compression; + } + + public function setCompression(bool $compression): self + { + $this->compression = $compression; + + return $this; + } + + public function getCertFile(): string + { + return $this->certFile; + } + + public function setCertFile(string $certFile): void + { + $this->certFile = $certFile; + } + + public function getKeyFile(): string + { + return $this->keyFile; + } + + public function setKeyFile(string $keyFile): void + { + $this->keyFile = $keyFile; + } + + public function getPassphrase(): string + { + return $this->passphrase; + } + + public function setPassphrase(string $passphrase): self + { + $this->passphrase = $passphrase; + + return $this; + } + + public function getPeerName(): string + { + return $this->peerName; + } + + public function setPeerName(string $peerName): self + { + $this->peerName = $peerName; + + return $this; + } + + public function getVerifyPeer(): bool + { + return $this->verifyPeer; + } + + public function setVerifyPeer(bool $verifyPeer): void + { + $this->verifyPeer = $verifyPeer; + } + + public function getVerifyPeerName(): bool + { + return $this->verifyPeerName; + } + + public function setVerifyPeerName(bool $verifyPeerName): self + { + $this->verifyPeerName = $verifyPeerName; + + return $this; + } + + public function getAllowSelfSigned(): bool + { + return $this->allowSelfSigned; + } + + public function setAllowSelfSigned(bool $allowSelfSigned): void + { + $this->allowSelfSigned = $allowSelfSigned; + } + + public function getCafile(): string + { + return $this->cafile; + } + + public function setCafile(string $cafile): self + { + $this->cafile = $cafile; + + return $this; + } + + public function getCapath(): string + { + return $this->capath; + } + + public function setCapath(string $capath): self + { + $this->capath = $capath; + + return $this; + } + + public function getVerifyDepth(): int + { + return $this->verifyDepth; + } + + public function setVerifyDepth(int $verifyDepth): void + { + $this->verifyDepth = $verifyDepth; + } + + public function getSwooleConfig(string $host): array + { + if (!$this->getOpen()) { + return []; + } + $config['ssl_compress'] = $this->getCompression(); + '' != $this->getCertFile() && $config['ssl_cert_file'] = $this->getCertFile(); + '' != $this->getKeyFile() && $config['ssl_key_file'] = $this->getKeyFile(); + '' != $this->getPassphrase() && $config['ssl_passphrase'] = $this->getPassphrase(); + '' != $this->getCafile() && $config['ssl_cafile'] = $this->getCafile(); + '' != $this->getCapath() && $config['ssl_capath'] = $this->getCapath(); + $config['ssl_verify_peer'] = $this->getVerifyPeer(); + $config['ssl_allow_self_signed'] = $this->getAllowSelfSigned(); + $config['ssl_verify_depth'] = $this->getVerifyDepth(); + if ($this->getVerifyPeerName()) { + $config['ssl_host_name'] = $this->getPeerName() ?: $host; + } + + return $config; + } + + public function getStreamConfig(string $host): array + { + if (!$this->getOpen()) { + return []; + } + $config['disable_compression'] = !$this->getCompression(); + '' != $this->getCertFile() && $config['local_cert'] = $this->getCertFile(); + '' != $this->getKeyFile() && $config['local_pk'] = $this->getKeyFile(); + '' != $this->getPassphrase() && $config['passphrase'] = $this->getPassphrase(); + '' != $this->getCafile() && $config['cafile'] = $this->getCafile(); + '' != $this->getCapath() && $config['capath'] = $this->getCapath(); + $config['verify_peer'] = $this->getVerifyPeer(); + $config['allow_self_signed'] = $this->getAllowSelfSigned(); + if ($this->getVerifyDepth() > 0) { + $config['verify_depth'] = $this->getVerifyDepth(); + } + $config['verify_peer_name'] = $this->getVerifyPeerName(); + $config['peer_name'] = $this->getPeerName() ?: $host; + + return $config; + } +} diff --git a/src/Socket/StreamSocket.php b/src/Socket/StreamSocket.php index 21cbefb..32a989c 100644 --- a/src/Socket/StreamSocket.php +++ b/src/Socket/StreamSocket.php @@ -76,13 +76,14 @@ public function isConnected(): bool public function connect(): void { - $uri = sprintf('tcp://%s:%s', $this->host, $this->port); + $uri = $this->getURI(); $socket = stream_socket_client( $uri, $errno, $errstr, $this->config->getConnectTimeout(), - \STREAM_CLIENT_CONNECT + \STREAM_CLIENT_CONNECT, + $this->getContext() ); if (!\is_resource($socket)) { @@ -239,4 +240,25 @@ protected function getMetaData(): array { return stream_get_meta_data($this->socket); } + + /** + * @return resource + */ + protected function getContext() + { + return stream_context_create([ + 'ssl' => $this->config->getSsl()->getStreamConfig($this->getHost()), + ]); + } + + protected function getURI(): string + { + $protocol = 'tcp'; + $ssl = $this->getConfig()->getSsl(); + if ($ssl->getOpen()) { + $protocol = 'ssl'; + } + + return sprintf('%s://%s:%s', $protocol, $this->host, $this->port); + } } diff --git a/src/Socket/SwooleSocket.php b/src/Socket/SwooleSocket.php index 015592b..7c8aab7 100644 --- a/src/Socket/SwooleSocket.php +++ b/src/Socket/SwooleSocket.php @@ -83,12 +83,8 @@ public function isConnected(): bool public function connect(): void { $config = $this->config; - $client = new Client(\SWOOLE_SOCK_TCP); - $client->set([ - 'connect_timeout' => $config->getConnectTimeout(), - 'read_timeout' => $config->getRecvTimeout(), - 'write_timeout' => $config->getSendTimeout(), - ]); + $client = new Client($this->getClientType()); + $client->set($this->getClientConfig()); if ($client->connect($this->host, $this->port)) { $this->socket = $client; } else { @@ -150,4 +146,27 @@ public function recv(int $length, ?float $timeout = null): string return ''; } + + protected function getClientType(): int + { + $clientType = \SWOOLE_SOCK_TCP; + $ssl = $this->getConfig()->getSsl(); + if ($ssl->getOpen()) { + $clientType = $clientType | \SWOOLE_SSL; + } + + return $clientType; + } + + protected function getClientConfig(): array + { + $config = $this->getConfig(); + $clientConfig = [ + 'connect_timeout' => $config->getConnectTimeout(), + 'read_timeout' => $config->getRecvTimeout(), + 'write_timeout' => $config->getSendTimeout(), + ]; + + return array_merge($clientConfig, $config->getSsl()->getSwooleConfig($this->getHost())); + } } diff --git a/tests/ConsumerTest.php b/tests/ConsumerTest.php index 7f35aeb..a80835c 100644 --- a/tests/ConsumerTest.php +++ b/tests/ConsumerTest.php @@ -15,7 +15,7 @@ public function testConsumeWithRangeAssignor(): void { $config = new ConsumerConfig(); $config->setBroker(TestUtil::getHost() . ':' . TestUtil::getPort()); - $config->setSasl(TestUtil::getSasl()); + TestUtil::addConfigInfo($config); $config->setTopic('test'); $config->setGroupId('testGroup'); $config->setClientId('testConsumeWithRangeAssignor'); @@ -35,7 +35,7 @@ public function testConsumeWithRoundRobinAssignor(): void { $config = new ConsumerConfig(); $config->setBroker(TestUtil::getHost() . ':' . TestUtil::getPort()); - $config->setSasl(TestUtil::getSasl()); + TestUtil::addConfigInfo($config); $config->setTopic('test'); $config->setGroupId('testGroup'); $config->setClientId('testConsumeWithRoundRobinAssignor'); @@ -55,7 +55,7 @@ public function testConsumeWithStickyAssignor(): void { $config = new ConsumerConfig(); $config->setBroker([TestUtil::getHost() . ':' . TestUtil::getPort()]); - $config->setSasl(TestUtil::getSasl()); + TestUtil::addConfigInfo($config); $config->setTopic('test'); $config->setGroupId('testGroup'); $config->setClientId('testConsumeWithStickyAssignor'); diff --git a/tests/ProducerTest.php b/tests/ProducerTest.php index e918876..08c4f9b 100644 --- a/tests/ProducerTest.php +++ b/tests/ProducerTest.php @@ -15,7 +15,7 @@ public function testSend(): void { $config = new ProducerConfig(); $config->setBootstrapServer(TestUtil::getHost() . ':' . TestUtil::getPort()); - $config->setSasl(TestUtil::getSasl()); + TestUtil::addConfigInfo($config); $config->setAcks(-1); $producer = new Producer($config); $producer->send('test', (string) microtime(true), uniqid('', true), [], 0); @@ -31,7 +31,7 @@ public function testSendBatch(): void { $config = new ProducerConfig(); $config->setBootstrapServer(TestUtil::getHost() . ':' . TestUtil::getPort()); - $config->setSasl(TestUtil::getSasl()); + TestUtil::addConfigInfo($config); $config->setAcks(-1); $producer = new Producer($config); $producer->sendBatch([ diff --git a/tests/TestUtil.php b/tests/TestUtil.php index 9b2a3fd..672ac4a 100644 --- a/tests/TestUtil.php +++ b/tests/TestUtil.php @@ -7,6 +7,7 @@ use longlang\phpkafka\Client\ClientInterface; use longlang\phpkafka\Client\SyncClient; use longlang\phpkafka\Config\CommonConfig; +use longlang\phpkafka\Config\SslConfig; use longlang\phpkafka\Protocol\Metadata\MetadataRequest; use longlang\phpkafka\Protocol\Metadata\MetadataRequestTopic; use longlang\phpkafka\Protocol\Metadata\MetadataResponse; @@ -34,12 +35,27 @@ public static function getSasl(): array return json_decode($result, true); } + public static function getSsl(): SslConfig + { + $result = getenv('KAFKA_SSL') ?: '{}'; + + return new SslConfig(json_decode($result, true)); + } + + public static function addConfigInfo(CommonConfig $config): CommonConfig + { + $config->setSasl(self::getSasl()); + $config->setSsl(self::getSsl()); + + return $config; + } + public static function createKafkaClient(string $class = null): ClientInterface { $config = new CommonConfig(); $config->setSendTimeout(10); $config->setRecvTimeout(10); - $config->setSasl(self::getSasl()); + self::addConfigInfo($config); if (null === $class) { $class = getenv('KAFKA_CLIENT_CLASS') ?: SyncClient::class; } @@ -59,7 +75,7 @@ public static function getControllerClient(): ClientInterface $config = new CommonConfig(); $config->setSendTimeout(10); $config->setRecvTimeout(10); - $config->setSasl(self::getSasl()); + self::addConfigInfo($config); $class = getenv('KAFKA_CLIENT_CLASS') ?: SyncClient::class; $nodeId = $response->getControllerId(); foreach ($response->getBrokers() as $broker) { @@ -89,7 +105,7 @@ public static function getLeaderBrokerClient(string $topic, int $partition): Cli $config = new CommonConfig(); $config->setSendTimeout(10); $config->setRecvTimeout(10); - $config->setSasl(self::getSasl()); + self::addConfigInfo($config); $class = getenv('KAFKA_CLIENT_CLASS') ?: SyncClient::class; $nodeId = $partitionItem->getLeaderId(); foreach ($response->getBrokers() as $broker) {