fix(astrolabe): add pagination and psalm fixes for token refresh
- Add pagination to getAllUsersWithTokens() with limit/offset params - Update RefreshUserTokens to process users in batches of 100 - Add lock TTL documentation to withTokenLock() docstring - Fix psalm type errors in getAccessToken() method - Add unit tests for pagination and batched processing Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
+23
-12
@@ -38,6 +38,9 @@ class RefreshUserTokens extends TimedJob {
|
||||
/** Default assumed token lifetime if we can't determine it (1 hour) */
|
||||
private const DEFAULT_TOKEN_LIFETIME_SECONDS = 3600;
|
||||
|
||||
/** Batch size for processing users (prevents memory issues on large installations) */
|
||||
private const BATCH_SIZE = 100;
|
||||
|
||||
public function __construct(
|
||||
ITimeFactory $time,
|
||||
private McpTokenStorage $tokenStorage,
|
||||
@@ -52,23 +55,31 @@ class RefreshUserTokens extends TimedJob {
|
||||
protected function run(mixed $argument): void {
|
||||
$this->logger->info('RefreshUserTokens: Starting background token refresh');
|
||||
|
||||
$userIds = $this->tokenStorage->getAllUsersWithTokens();
|
||||
$this->logger->debug('RefreshUserTokens: Found ' . count($userIds) . ' users with tokens');
|
||||
|
||||
$refreshed = 0;
|
||||
$failed = 0;
|
||||
$skipped = 0;
|
||||
$offset = 0;
|
||||
$totalUsers = 0;
|
||||
|
||||
foreach ($userIds as $userId) {
|
||||
$result = $this->refreshUserTokenIfNeeded($userId);
|
||||
match ($result) {
|
||||
'refreshed' => $refreshed++,
|
||||
'failed' => $failed++,
|
||||
'skipped' => $skipped++,
|
||||
};
|
||||
}
|
||||
// Process users in batches to prevent memory issues on large installations
|
||||
do {
|
||||
$userIds = $this->tokenStorage->getAllUsersWithTokens(self::BATCH_SIZE, $offset);
|
||||
$batchCount = count($userIds);
|
||||
$totalUsers += $batchCount;
|
||||
|
||||
$this->logger->info("RefreshUserTokens: Complete - refreshed=$refreshed, failed=$failed, skipped=$skipped");
|
||||
foreach ($userIds as $userId) {
|
||||
$result = $this->refreshUserTokenIfNeeded($userId);
|
||||
match ($result) {
|
||||
'refreshed' => $refreshed++,
|
||||
'failed' => $failed++,
|
||||
'skipped' => $skipped++,
|
||||
};
|
||||
}
|
||||
|
||||
$offset += self::BATCH_SIZE;
|
||||
} while ($batchCount === self::BATCH_SIZE);
|
||||
|
||||
$this->logger->info("RefreshUserTokens: Complete - total=$totalUsers, refreshed=$refreshed, failed=$failed, skipped=$skipped");
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
+24
-5
@@ -156,6 +156,10 @@ class McpTokenStorage {
|
||||
*
|
||||
* Prevents race conditions between background job and on-demand token refresh.
|
||||
*
|
||||
* Note: Lock TTL is configured at the Nextcloud server level (default: 3600s).
|
||||
* If a process crashes while holding the lock, it will auto-expire after the TTL.
|
||||
* The ILockingProvider interface does not support per-call timeouts.
|
||||
*
|
||||
* @template T
|
||||
* @param string $userId User ID
|
||||
* @param callable(): T $callback
|
||||
@@ -198,20 +202,29 @@ class McpTokenStorage {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all user IDs that have OAuth tokens stored.
|
||||
* Get user IDs that have OAuth tokens stored.
|
||||
*
|
||||
* Queries oc_preferences directly since IConfig doesn't support
|
||||
* listing all users with a specific key set.
|
||||
*
|
||||
* @param int $limit Maximum users to return (0 = no limit, for backward compatibility)
|
||||
* @param int $offset Starting offset for pagination
|
||||
* @return list<string> Array of user IDs
|
||||
*/
|
||||
public function getAllUsersWithTokens(): array {
|
||||
public function getAllUsersWithTokens(int $limit = 0, int $offset = 0): array {
|
||||
$qb = $this->db->getQueryBuilder();
|
||||
$qb->select('userid')
|
||||
->from('preferences')
|
||||
->where($qb->expr()->eq('appid', $qb->createNamedParameter('astrolabe')))
|
||||
->andWhere($qb->expr()->eq('configkey', $qb->createNamedParameter('oauth_tokens')));
|
||||
|
||||
if ($limit > 0) {
|
||||
$qb->setMaxResults($limit);
|
||||
}
|
||||
if ($offset > 0) {
|
||||
$qb->setFirstResult($offset);
|
||||
}
|
||||
|
||||
$result = $qb->executeQuery();
|
||||
/** @var list<string> $userIds */
|
||||
$userIds = [];
|
||||
@@ -255,19 +268,23 @@ class McpTokenStorage {
|
||||
|
||||
// Token expired - acquire lock for refresh
|
||||
try {
|
||||
/** @var string|null */
|
||||
/**
|
||||
* @return string|null
|
||||
* @psalm-suppress MixedInferredReturnType
|
||||
*/
|
||||
return $this->withTokenLock($userId, function () use ($userId, $refreshCallback): ?string {
|
||||
// Re-check after acquiring lock (double-check pattern)
|
||||
// Another process may have refreshed while we waited for the lock
|
||||
$currentToken = $this->getUserToken($userId);
|
||||
|
||||
if (!$currentToken) {
|
||||
if ($currentToken === null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Check if another process already refreshed the token
|
||||
if (!$this->isExpired($currentToken)) {
|
||||
$this->logger->debug("Token already refreshed for user $userId while waiting for lock");
|
||||
/** @var string */
|
||||
return $currentToken['access_token'];
|
||||
}
|
||||
|
||||
@@ -322,7 +339,9 @@ class McpTokenStorage {
|
||||
// Could not acquire lock - another process is refreshing
|
||||
// Return stale token rather than failing - caller can retry if needed
|
||||
$this->logger->warning("Could not acquire token lock for user $userId, returning stale token");
|
||||
return $token['access_token'] ?? null;
|
||||
/** @var string|null $staleToken */
|
||||
$staleToken = $token['access_token'] ?? null;
|
||||
return $staleToken;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -79,14 +79,11 @@ final class RefreshUserTokensTest extends TestCase {
|
||||
if ($callCount === 1) {
|
||||
$this->assertStringContainsString('Starting', $message);
|
||||
} else {
|
||||
$this->assertStringContainsString('total=0', $message);
|
||||
$this->assertStringContainsString('refreshed=0, failed=0, skipped=0', $message);
|
||||
}
|
||||
});
|
||||
|
||||
$this->logger->expects($this->once())
|
||||
->method('debug')
|
||||
->with($this->stringContains('Found 0 users'));
|
||||
|
||||
// Call run() via reflection since it's protected
|
||||
$this->invokeRun();
|
||||
}
|
||||
@@ -151,6 +148,7 @@ final class RefreshUserTokensTest extends TestCase {
|
||||
static $callCount = 0;
|
||||
$callCount++;
|
||||
if ($callCount === 2) {
|
||||
$this->assertStringContainsString('total=3', $message);
|
||||
$this->assertStringContainsString('refreshed=1, failed=1, skipped=1', $message);
|
||||
}
|
||||
});
|
||||
@@ -158,6 +156,63 @@ final class RefreshUserTokensTest extends TestCase {
|
||||
$this->invokeRun();
|
||||
}
|
||||
|
||||
public function testRunProcessesUsersInBatches(): void {
|
||||
$this->setupDefaultLockBehavior();
|
||||
|
||||
// Simulate 150 users processed in 2 batches (100 + 50)
|
||||
$batch1 = array_map(fn ($i) => "user{$i}", range(1, 100));
|
||||
$batch2 = array_map(fn ($i) => "user{$i}", range(101, 150));
|
||||
|
||||
$callCount = 0;
|
||||
$this->tokenStorage->method('getAllUsersWithTokens')
|
||||
->willReturnCallback(function (int $limit, int $offset) use (&$callCount, $batch1, $batch2) {
|
||||
$callCount++;
|
||||
// First call: offset 0, return 100 users (full batch)
|
||||
if ($offset === 0) {
|
||||
$this->assertEquals(100, $limit);
|
||||
return $batch1;
|
||||
}
|
||||
// Second call: offset 100, return 50 users (partial batch = last)
|
||||
if ($offset === 100) {
|
||||
$this->assertEquals(100, $limit);
|
||||
return $batch2;
|
||||
}
|
||||
// Should not be called again
|
||||
$this->fail("Unexpected getAllUsersWithTokens call with offset $offset");
|
||||
});
|
||||
|
||||
// All tokens have plenty of time (all skipped)
|
||||
$this->tokenStorage->method('getUserToken')
|
||||
->willReturnCallback(function (string $userId) {
|
||||
$now = time();
|
||||
return [
|
||||
'access_token' => "{$userId}-token",
|
||||
'refresh_token' => "{$userId}-refresh",
|
||||
'expires_at' => $now + 3600,
|
||||
'issued_at' => $now,
|
||||
];
|
||||
});
|
||||
|
||||
$this->tokenRefresher->expects($this->never())
|
||||
->method('refreshAccessToken');
|
||||
|
||||
$this->logger->expects($this->exactly(2))
|
||||
->method('info')
|
||||
->willReturnCallback(function (string $message) {
|
||||
static $infoCallCount = 0;
|
||||
$infoCallCount++;
|
||||
if ($infoCallCount === 2) {
|
||||
$this->assertStringContainsString('total=150', $message);
|
||||
$this->assertStringContainsString('refreshed=0, failed=0, skipped=150', $message);
|
||||
}
|
||||
});
|
||||
|
||||
$this->invokeRun();
|
||||
|
||||
// Verify getAllUsersWithTokens was called exactly twice (2 batches)
|
||||
$this->assertEquals(2, $callCount);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// refreshUserTokenIfNeeded() Tests
|
||||
// =========================================================================
|
||||
|
||||
@@ -750,4 +750,80 @@ final class McpTokenStorageTest extends TestCase {
|
||||
|
||||
$this->assertEquals([], $userIds);
|
||||
}
|
||||
|
||||
public function testGetAllUsersWithTokensWithLimitAndOffset(): void {
|
||||
$qb = $this->createMock(IQueryBuilder::class);
|
||||
$expr = $this->createMock(IExpressionBuilder::class);
|
||||
$result = $this->createMock(IResult::class);
|
||||
|
||||
// Chain builder methods
|
||||
$qb->method('select')->willReturnSelf();
|
||||
$qb->method('from')->willReturnSelf();
|
||||
$qb->method('where')->willReturnSelf();
|
||||
$qb->method('andWhere')->willReturnSelf();
|
||||
$qb->method('expr')->willReturn($expr);
|
||||
$qb->method('createNamedParameter')->willReturnArgument(0);
|
||||
$qb->method('executeQuery')->willReturn($result);
|
||||
|
||||
// Verify setMaxResults and setFirstResult are called with correct values
|
||||
$qb->expects($this->once())
|
||||
->method('setMaxResults')
|
||||
->with(50)
|
||||
->willReturnSelf();
|
||||
$qb->expects($this->once())
|
||||
->method('setFirstResult')
|
||||
->with(100)
|
||||
->willReturnSelf();
|
||||
|
||||
// Mock expression builder
|
||||
$expr->method('eq')->willReturn('mocked_condition');
|
||||
|
||||
// Mock result set
|
||||
$result->method('fetch')->willReturnOnConsecutiveCalls(
|
||||
['userid' => 'user1'],
|
||||
['userid' => 'user2'],
|
||||
false
|
||||
);
|
||||
$result->expects($this->once())->method('closeCursor');
|
||||
|
||||
$this->db->method('getQueryBuilder')->willReturn($qb);
|
||||
|
||||
$userIds = $this->storage->getAllUsersWithTokens(50, 100);
|
||||
|
||||
$this->assertEquals(['user1', 'user2'], $userIds);
|
||||
}
|
||||
|
||||
public function testGetAllUsersWithTokensWithZeroLimitDoesNotSetMaxResults(): void {
|
||||
$qb = $this->createMock(IQueryBuilder::class);
|
||||
$expr = $this->createMock(IExpressionBuilder::class);
|
||||
$result = $this->createMock(IResult::class);
|
||||
|
||||
// Chain builder methods
|
||||
$qb->method('select')->willReturnSelf();
|
||||
$qb->method('from')->willReturnSelf();
|
||||
$qb->method('where')->willReturnSelf();
|
||||
$qb->method('andWhere')->willReturnSelf();
|
||||
$qb->method('expr')->willReturn($expr);
|
||||
$qb->method('createNamedParameter')->willReturnArgument(0);
|
||||
$qb->method('executeQuery')->willReturn($result);
|
||||
|
||||
// setMaxResults should NOT be called when limit is 0
|
||||
$qb->expects($this->never())
|
||||
->method('setMaxResults');
|
||||
|
||||
// setFirstResult should NOT be called when offset is 0
|
||||
$qb->expects($this->never())
|
||||
->method('setFirstResult');
|
||||
|
||||
// Mock expression builder
|
||||
$expr->method('eq')->willReturn('mocked_condition');
|
||||
|
||||
// Mock result set
|
||||
$result->method('fetch')->willReturn(false);
|
||||
$result->expects($this->once())->method('closeCursor');
|
||||
|
||||
$this->db->method('getQueryBuilder')->willReturn($qb);
|
||||
|
||||
$this->storage->getAllUsersWithTokens(0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user