Skip to content

Commit

Permalink
refactor: package controller & package version controller.
Browse files Browse the repository at this point in the history
  • Loading branch information
hezhengxu2018 committed Jun 19, 2024
1 parent 948dad5 commit 3ca36ff
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 173 deletions.
66 changes: 23 additions & 43 deletions app/common/adapter/NPMRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
HttpClientRequestOptions,
HttpClientResponse,
} from 'egg';
import { ABBREVIATED_META_TYPE } from '../constants';
import { PackageManifestType } from '../../repository/PackageRepository';

type HttpMethod = HttpClientRequestOptions['method'];

Expand Down Expand Up @@ -41,26 +41,30 @@ export class NPMRegistry {
this.registryHost = registryHost;
}

public async getFullManifests(fullname: string, optionalConfig?: {retries?:number, remoteAuthToken?:string}): Promise<RegistryResponse> {
public async getFullManifests(fullname: string, optionalConfig?: { retries?: number, remoteAuthToken?: string }): Promise<{ method: HttpMethod } & HttpClientResponse<PackageManifestType>> {
let retries = optionalConfig?.retries || 3;
// set query t=timestamp, make sure CDN cache disable
// cache=0 is sync worker request flag
const url = `${this.registry}/${encodeURIComponent(fullname)}?t=${Date.now()}&cache=0`;
return await this.getManifest(url, optionalConfig);
}

public async getAbbreviatedManifests(fullname: string, optionalConfig?: {retries?:number, remoteAuthToken?:string}): Promise<RegistryResponse> {
const url = `${this.registry}/${encodeURIComponent(fullname)}?t=${Date.now()}&cache=0`;
return await this.getManifest(url, { ...optionalConfig, isAbbreviated: true });
}

public async getPackageVersionManifest(fullname: string, versionOrTag: string, optionalConfig?: {retries?:number, remoteAuthToken?:string}) {
const url = `${this.registry}/${encodeURIComponent(fullname)}/${versionOrTag}`;
return await this.getManifest(url, optionalConfig);
}

public async getAbbreviatedPackageVersionManifest(fullname: string, versionOrTag: string, optionalConfig?: {retries?:number, remoteAuthToken?:string}) {
const url = `${this.registry}/${encodeURIComponent(fullname)}/${versionOrTag}`;
return await this.getManifest(url, { ...optionalConfig, isAbbreviated: true });
let lastError: any;
while (retries > 0) {
try {
// large package: https://r.cnpmjs.org/%40procore%2Fcore-icons
// https://r.cnpmjs.org/intraactive-sdk-ui 44s
const authorization = this.genAuthorizationHeader(optionalConfig?.remoteAuthToken);
return await this.request('GET', url, undefined, { timeout: 120000, headers: { authorization } });
} catch (err: any) {
if (err.name === 'ResponseTimeoutError') throw err;
lastError = err;
}
retries--;
if (retries > 0) {
// sleep 1s ~ 4s in random
const delay = process.env.NODE_ENV === 'test' ? 1 : 1000 + Math.random() * 4000;
await setTimeout(delay);
}
}
throw lastError;
}

// app.put('/:name/sync', sync.sync);
Expand Down Expand Up @@ -107,31 +111,7 @@ export class NPMRegistry {
};
}

private genAuthorizationHeader(remoteAuthToken?:string) {
public genAuthorizationHeader(remoteAuthToken?:string) {
return remoteAuthToken ? `Bearer ${remoteAuthToken}` : '';
}

private async getManifest(url, optionalConfig?: {retries?:number, remoteAuthToken?:string, isAbbreviated?:boolean}) {
let retries = optionalConfig?.retries || 3;
let lastError: any;
while (retries > 0) {
try {
// large package: https://r.cnpmjs.org/%40procore%2Fcore-icons
// https://r.cnpmjs.org/intraactive-sdk-ui 44s
const authorization = this.genAuthorizationHeader(optionalConfig?.remoteAuthToken);
const accept = optionalConfig?.isAbbreviated ? ABBREVIATED_META_TYPE : '';
return await this.request('GET', url, undefined, { timeout: 120000, headers: { authorization, accept } });
} catch (err: any) {
if (err.name === 'ResponseTimeoutError') throw err;
lastError = err;
}
retries--;
if (retries > 0) {
// sleep 1s ~ 4s in random
const delay = process.env.NODE_ENV === 'test' ? 1 : 1000 + Math.random() * 4000;
await setTimeout(delay);
}
}
throw lastError;
}
}
218 changes: 115 additions & 103 deletions app/core/service/ProxyCacheService.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import { EggHttpClient, HttpClientResponse } from 'egg';
import { InternalServerError, ForbiddenError, HttpError, NotFoundError } from 'egg-errors';
import { SingletonProto, AccessLevel, Inject } from '@eggjs/tegg';
import { EggHttpClient, HttpClientRequestOptions, HttpClientResponse } from 'egg';
import { ForbiddenError } from 'egg-errors';
import { SingletonProto, AccessLevel, Inject, EggContext } from '@eggjs/tegg';
import { BackgroundTaskHelper } from '@eggjs/tegg-background-task';
import { valid as semverValid } from 'semver';
import { AbstractService } from '../../common/AbstractService';
import { TaskService } from './TaskService';
import { CacheService } from './CacheService';
import { RegistryManagerService } from './RegistryManagerService';
import { NPMRegistry } from '../../common/adapter/NPMRegistry';
import { NFSAdapter } from '../../common/adapter/NFSAdapter';
import { ProxyCache } from '../entity/ProxyCache';
import { Task, UpdateProxyCacheTaskOptions, CreateUpdateProxyCacheTask } from '../entity/Task';
import { ProxyCacheRepository } from '../../repository/ProxyCacheRepository';
import { TaskType, TaskState } from '../../common/enum/Task';
import { calculateIntegrity } from '../../common/PackageUtil';
import { PROXY_CACHE_DIR_NAME } from '../../common/constants';
import { ABBREVIATED_META_TYPE, PROXY_CACHE_DIR_NAME } from '../../common/constants';
import { DIST_NAMES } from '../entity/Package';
import type { AbbreviatedPackageManifestType, AbbreviatedPackageJSONType, PackageManifestType, PackageJSONType } from '../../repository/PackageRepository';

Expand All @@ -25,11 +26,9 @@ export function isPkgManifest(fileType: DIST_NAMES) {
return fileType === DIST_NAMES.FULL_MANIFESTS || fileType === DIST_NAMES.ABBREVIATED_MANIFESTS;
}

type GetSourceManifestAndCacheReturnType<T> = {
proxyBytes: Buffer,
manifest: T extends DIST_NAMES.ABBREVIATED | DIST_NAMES.MANIFEST ? AbbreviatedPackageJSONType | PackageJSONType :
T extends DIST_NAMES.FULL_MANIFESTS | DIST_NAMES.ABBREVIATED_MANIFESTS ? AbbreviatedPackageManifestType|PackageManifestType : never;
};
type GetSourceManifestAndCacheReturnType<T> = T extends DIST_NAMES.ABBREVIATED | DIST_NAMES.MANIFEST ? AbbreviatedPackageJSONType | PackageJSONType :
T extends DIST_NAMES.FULL_MANIFESTS | DIST_NAMES.ABBREVIATED_MANIFESTS ? AbbreviatedPackageManifestType|PackageManifestType : never;


@SingletonProto({
accessLevel: AccessLevel.PUBLIC,
Expand All @@ -44,47 +43,31 @@ export class ProxyCacheService extends AbstractService {
@Inject()
private readonly proxyCacheRepository: ProxyCacheRepository;
@Inject()
private readonly registryManagerService: RegistryManagerService;
@Inject()
private readonly taskService: TaskService;
@Inject()
private readonly cacheService: CacheService;
@Inject()
private readonly backgroundTaskHelper:BackgroundTaskHelper;

async getPackageVersionTarResponse(fullname: string, url: string): Promise<HttpClientResponse> {
async getPackageVersionTarResponse(fullname: string, ctx: EggContext): Promise<HttpClientResponse> {
if (this.config.cnpmcore.syncPackageBlockList.includes(fullname)) {
throw new ForbiddenError(`stop proxy by block list: ${JSON.stringify(this.config.cnpmcore.syncPackageBlockList)}`);
}
const requestTgzURL = `${this.npmRegistry.registry}${url}`;
return await this.httpclient.request(requestTgzURL, {
timeout: 60000 * 10,
streaming: true,
timing: true,
followRedirect: true,
}) as HttpClientResponse;
return await this.getProxyResponse(ctx);
}

async getPackageManifest(fullname: string, fileType: DIST_NAMES.FULL_MANIFESTS| DIST_NAMES.ABBREVIATED_MANIFESTS): Promise<AbbreviatedPackageManifestType|PackageManifestType> {
const cachedStoreKey = (await this.proxyCacheRepository.findProxyCache(fullname, fileType))?.filePath;
if (cachedStoreKey) {
const nfsBytes = await this.nfsAdapter.getBytes(cachedStoreKey);
if (nfsBytes) {
let nfsPkgManifgest;
try {
const nfsString = Buffer.from(nfsBytes).toString();
nfsPkgManifgest = JSON.parse(nfsString);
} catch {
// JSON parse error
await this.nfsAdapter.remove(cachedStoreKey);
await this.proxyCacheRepository.removeProxyCache(fullname, fileType);
throw new InternalServerError('manifest JSON in NFS parse error');
}
return nfsPkgManifgest;
}
await this.proxyCacheRepository.removeProxyCache(fullname, fileType);
throw new NotFoundError('can not found manifest in NFS.');
const nfsString = Buffer.from(nfsBytes!).toString();
const nfsPkgManifgest = JSON.parse(nfsString);
return nfsPkgManifgest;
}

const { manifest } = await this.getSourceManifestAndCache<typeof fileType>(fullname, fileType);
const manifest = await this.getSourceManifestAndCache<typeof fileType>(fullname, fileType);
this.backgroundTaskHelper.run(async () => {
const cachedFiles = ProxyCache.create({ fullname, fileType });
await this.proxyCacheRepository.saveProxyCache(cachedFiles);
Expand All @@ -105,82 +88,17 @@ export class ProxyCacheService extends AbstractService {
const cachedStoreKey = (await this.proxyCacheRepository.findProxyCache(fullname, fileType, version))?.filePath;
if (cachedStoreKey) {
const nfsBytes = await this.nfsAdapter.getBytes(cachedStoreKey);
if (nfsBytes) {
try {
const nfsString = Buffer.from(nfsBytes).toString();
return JSON.parse(nfsString) as PackageJSONType | AbbreviatedPackageJSONType;
} catch {
// JSON parse error
await this.nfsAdapter.remove(cachedStoreKey);
await this.proxyCacheRepository.removeProxyCache(fullname, fileType);
throw new InternalServerError('manifest in NFS JSON parse error');
}
}
const nfsString = Buffer.from(nfsBytes!).toString();
return JSON.parse(nfsString) as PackageJSONType | AbbreviatedPackageJSONType;
}
const { manifest } = await this.getSourceManifestAndCache(fullname, fileType, versionOrTag);
const manifest = await this.getSourceManifestAndCache(fullname, fileType, versionOrTag);
this.backgroundTaskHelper.run(async () => {
const cachedFiles = ProxyCache.create({ fullname, fileType, version });
await this.proxyCacheRepository.saveProxyCache(cachedFiles);
});
return manifest;
}

async getSourceManifestAndCache<T extends DIST_NAMES>(fullname:string, fileType: T, versionOrTag?:string): Promise<GetSourceManifestAndCacheReturnType<T>> {
let responseResult;
switch (fileType) {
case DIST_NAMES.FULL_MANIFESTS:
responseResult = await this.npmRegistry.getFullManifests(fullname);
break;
case DIST_NAMES.ABBREVIATED_MANIFESTS:
responseResult = await this.npmRegistry.getAbbreviatedManifests(fullname);
break;
case DIST_NAMES.MANIFEST:
responseResult = await this.npmRegistry.getPackageVersionManifest(fullname, versionOrTag!);
break;
case DIST_NAMES.ABBREVIATED:
responseResult = await this.npmRegistry.getAbbreviatedPackageVersionManifest(fullname, versionOrTag!);
break;
default:
break;
}
if (responseResult.status !== 200) {
throw new HttpError({
status: responseResult.status,
message: responseResult.data?.error || responseResult.statusText,
});
}

// replace tarball url
const manifest = responseResult.data;
const { sourceRegistry, registry } = this.config.cnpmcore;
if (isPkgManifest(fileType)) {
// pkg manifest
const versionMap = manifest.versions || {};
for (const key in versionMap) {
const versionItem = versionMap[key];
if (versionItem?.dist?.tarball) {
versionItem.dist.tarball = versionItem.dist.tarball.replace(sourceRegistry, registry);
}
}
} else {
// pkg version manifest
const distItem = manifest.dist || {};
if (distItem.tarball) {
distItem.tarball = distItem.tarball.replace(sourceRegistry, registry);
}
}
const proxyBytes = Buffer.from(JSON.stringify(manifest));
let storeKey: string;
if (isPkgManifest(fileType)) {
storeKey = `/${PROXY_CACHE_DIR_NAME}/${fullname}/${fileType}`;
} else {
const version = manifest.version;
storeKey = `/${PROXY_CACHE_DIR_NAME}/${fullname}/${version}/${fileType}`;
}
await this.nfsAdapter.uploadBytes(storeKey, proxyBytes);
return { proxyBytes, manifest };
}

async removeProxyCache(fullname: string, fileType: DIST_NAMES, version?: string) {
const storeKey = isPkgManifest(fileType)
? `/${PROXY_CACHE_DIR_NAME}/${fullname}/${fileType}`
Expand All @@ -201,13 +119,13 @@ export class ProxyCacheService extends AbstractService {
const logs: string[] = [];
const fullname = (task as CreateUpdateProxyCacheTask).data.fullname;
const { fileType, version } = (task as CreateUpdateProxyCacheTask).data;
let cacheBytes;
let cachedManifest;
logs.push(`[${isoNow()}] 🚧🚧🚧🚧🚧 Start update "${fullname}-${fileType}" 🚧🚧🚧🚧🚧`);
try {
if (isPkgManifest(fileType)) {
const cachedFiles = await this.proxyCacheRepository.findProxyCache(fullname, fileType);
if (!cachedFiles) throw new Error('task params error, can not found record in repo.');
cacheBytes = (await this.getSourceManifestAndCache<typeof fileType>(fullname, fileType)).proxyBytes;
cachedManifest = await this.getSourceManifestAndCache<typeof fileType>(fullname, fileType);
ProxyCache.update(cachedFiles);
await this.proxyCacheRepository.saveProxyCache(cachedFiles);
} else {
Expand All @@ -232,11 +150,105 @@ export class ProxyCacheService extends AbstractService {
const isFullManifests = fileType === DIST_NAMES.FULL_MANIFESTS;
const cachedKey = await this.cacheService.getPackageEtag(fullname, isFullManifests);
if (cachedKey) {
const cacheBytes = Buffer.from(JSON.stringify(cachedManifest));
const { shasum: etag } = await calculateIntegrity(cacheBytes);
await this.cacheService.savePackageEtagAndManifests(fullname, isFullManifests, etag, cacheBytes);
logs.push(`[${isoNow()}] 🟢 Update Cache Success.`);
}
await this.taskService.finishTask(task, TaskState.Success, logs.join('\n'));
}

private async getSourceManifestAndCache<T extends DIST_NAMES>(fullname:string, fileType: T, versionOrTag?:string): Promise<GetSourceManifestAndCacheReturnType<T>> {
let responseResult;
switch (fileType) {
case DIST_NAMES.FULL_MANIFESTS:
responseResult = await this.getUpstreamFullManifests(fullname);
break;
case DIST_NAMES.ABBREVIATED_MANIFESTS:
responseResult = await this.getUpstreamAbbreviatedManifests(fullname);
break;
case DIST_NAMES.MANIFEST:
responseResult = await this.getUpstreamPackageVersionManifest(fullname, versionOrTag!);
break;
case DIST_NAMES.ABBREVIATED:
responseResult = await this.getUpstreamAbbreviatedPackageVersionManifest(fullname, versionOrTag!);
break;
default:
break;
}

// replace tarball url
const manifest = responseResult.data;
const { sourceRegistry, registry } = this.config.cnpmcore;
if (isPkgManifest(fileType)) {
// pkg manifest
const versionMap = manifest.versions || {};
for (const key in versionMap) {
const versionItem = versionMap[key];
if (versionItem?.dist?.tarball) {
versionItem.dist.tarball = versionItem.dist.tarball.replace(sourceRegistry, registry);
}
}
} else {
// pkg version manifest
const distItem = manifest.dist || {};
if (distItem.tarball) {
distItem.tarball = distItem.tarball.replace(sourceRegistry, registry);
}
}
let storeKey: string;
if (isPkgManifest(fileType)) {
storeKey = `/${PROXY_CACHE_DIR_NAME}/${fullname}/${fileType}`;
} else {
const version = manifest.version;
storeKey = `/${PROXY_CACHE_DIR_NAME}/${fullname}/${version}/${fileType}`;
}
await this.nfsAdapter.uploadFile(storeKey, JSON.stringify(manifest));
return manifest;
}

private async getProxyResponse(ctx: Partial<EggContext>, options?: HttpClientRequestOptions): Promise<HttpClientResponse> {
const registry = this.npmRegistry.registry;
const remoteAuthToken = await this.registryManagerService.getAuthTokenByRegistryHost(registry);
const authorization = this.npmRegistry.genAuthorizationHeader(remoteAuthToken);

const url = `${this.npmRegistry.registry}${ctx.url}`;

const res = await this.httpclient.request(url, {
timing: true,
followRedirect: true,
retry: 3,
dataType: 'stream',
timeout: 10000,
compressed: true,
...options,
headers: {
...ctx.headers,
authorization,
'x-forwarded-for': ctx?.ip,
via: `1.1, ${this.config.cnpmcore.registry}`,
},
}) as HttpClientResponse;
this.logger.info('[ProxyCacheService:getProxyStreamResponse] %s, status: %s', url, res.status);
return res;
}

private async getUpstreamFullManifests(fullname: string): Promise<HttpClientResponse> {
const url = `/${encodeURIComponent(fullname)}?t=${Date.now()}&cache=0`;
return await this.getProxyResponse({ url }, { dataType: 'json' });
}

private async getUpstreamAbbreviatedManifests(fullname: string): Promise<HttpClientResponse> {
const url = `/${encodeURIComponent(fullname)}?t=${Date.now()}&cache=0`;
return await this.getProxyResponse({ url, headers: { accept: ABBREVIATED_META_TYPE } }, { dataType: 'json' });
}
private async getUpstreamPackageVersionManifest(fullname: string, versionOrTag: string): Promise<HttpClientResponse> {
const url = `/${encodeURIComponent(fullname + '/' + versionOrTag)}?t=${Date.now()}&cache=0`;
return await this.getProxyResponse({ url }, { dataType: 'json' });
}
private async getUpstreamAbbreviatedPackageVersionManifest(fullname: string, versionOrTag: string): Promise<HttpClientResponse> {
const url = `/${encodeURIComponent(fullname + '/' + versionOrTag)}?t=${Date.now()}&cache=0`;
return await this.getProxyResponse({ url, headers: { accept: ABBREVIATED_META_TYPE } }, { dataType: 'json' });
}

}
Loading

0 comments on commit 3ca36ff

Please sign in to comment.