Performance Tuning Guide¶
Optimize SteadyText for maximum performance, reduced latency, and efficient resource usage.
Overview¶
SteadyText performance optimization focuses on:
- Daemon mode: 160x faster first response
- Cache optimization: Hit rates up to 95%+
- Batch processing: Amortize model loading costs
- Resource management: Memory and CPU optimization
- Concurrent operations: Thread-safe parallel processing
Table of Contents¶
- Performance Metrics
- Daemon Optimization
- Cache Tuning
- Model Performance
- Batch Processing
- Memory Management
- Concurrent Operations
- Monitoring and Profiling
- Production Optimization
- Benchmarking
Performance Metrics¶
Key Metrics to Monitor¶
import time
import psutil
import steadytext
from steadytext import get_cache_manager
from dataclasses import dataclass
from typing import List, Dict, Any
import statistics
@dataclass
class PerformanceMetrics:
"""Performance measurement results."""
operation: str
latency_ms: float
throughput_tps: float
memory_mb: float
cache_hit: bool
cpu_percent: float
class PerformanceMonitor:
"""Monitor SteadyText performance metrics."""
def __init__(self):
self.metrics: List[PerformanceMetrics] = []
self.cache_manager = get_cache_manager()
self.process = psutil.Process()
def measure_operation(self, func, *args, **kwargs):
"""Measure performance of a single operation."""
# Get initial state
initial_mem = self.process.memory_info().rss / 1024 / 1024
initial_cache_stats = self.cache_manager.get_cache_stats()
# Measure CPU
self.process.cpu_percent() # Initialize
# Time the operation
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
# Calculate metrics
latency_ms = (end_time - start_time) * 1000
throughput_tps = 1000 / latency_ms
final_mem = self.process.memory_info().rss / 1024 / 1024
memory_delta = final_mem - initial_mem
cpu_percent = self.process.cpu_percent()
# Check cache hit
final_cache_stats = self.cache_manager.get_cache_stats()
cache_hit = self._detect_cache_hit(initial_cache_stats, final_cache_stats)
metric = PerformanceMetrics(
operation=func.__name__,
latency_ms=latency_ms,
throughput_tps=throughput_tps,
memory_mb=memory_delta,
cache_hit=cache_hit,
cpu_percent=cpu_percent
)
self.metrics.append(metric)
return result, metric
def _detect_cache_hit(self, initial: dict, final: dict) -> bool:
"""Detect if a cache hit occurred."""
for cache_type in ['generation', 'embedding']:
initial_hits = initial.get(cache_type, {}).get('hits', 0)
final_hits = final.get(cache_type, {}).get('hits', 0)
if final_hits > initial_hits:
return True
return False
def get_summary(self) -> Dict[str, Any]:
"""Get performance summary statistics."""
if not self.metrics:
return {}
latencies = [m.latency_ms for m in self.metrics]
throughputs = [m.throughput_tps for m in self.metrics]
memory_deltas = [m.memory_mb for m in self.metrics]
cpu_percents = [m.cpu_percent for m in self.metrics]
cache_hits = sum(1 for m in self.metrics if m.cache_hit)
return {
'operations': len(self.metrics),
'cache_hit_rate': cache_hits / len(self.metrics),
'latency': {
'mean': statistics.mean(latencies),
'median': statistics.median(latencies),
'p95': sorted(latencies)[int(len(latencies) * 0.95)],
'p99': sorted(latencies)[int(len(latencies) * 0.99)],
},
'throughput': {
'mean': statistics.mean(throughputs),
'total': sum(throughputs),
},
'memory': {
'total_mb': sum(memory_deltas),
'mean_mb': statistics.mean(memory_deltas),
},
'cpu': {
'mean_percent': statistics.mean(cpu_percents),
'max_percent': max(cpu_percents),
}
}
# Usage example
monitor = PerformanceMonitor()
# Measure generation performance
for i in range(100):
prompt = f"Test prompt {i}"
result, metric = monitor.measure_operation(
steadytext.generate,
prompt,
seed=42
)
print(f"Latency: {metric.latency_ms:.2f}ms, Cache: {metric.cache_hit}")
# Get summary
summary = monitor.get_summary()
print(f"\nPerformance Summary:")
print(f"Cache hit rate: {summary['cache_hit_rate']:.2%}")
print(f"Mean latency: {summary['latency']['mean']:.2f}ms")
print(f"P95 latency: {summary['latency']['p95']:.2f}ms")
Daemon Optimization¶
Startup Performance¶
import subprocess
import time
from typing import Optional
class DaemonOptimizer:
"""Optimize daemon startup and performance."""
@staticmethod
def start_daemon_with_profiling():
"""Start daemon with performance profiling."""
start_time = time.time()
# Start daemon
result = subprocess.run([
'st', 'daemon', 'start',
'--seed', '42'
], capture_output=True, text=True)
# Wait for daemon to be ready
ready = False
for _ in range(30): # 30 second timeout
status = subprocess.run([
'st', 'daemon', 'status', '--json'
], capture_output=True, text=True)
if status.returncode == 0:
import json
data = json.loads(status.stdout)
if data.get('running'):
ready = True
break
time.sleep(0.1)
startup_time = time.time() - start_time
print(f"Daemon startup time: {startup_time:.2f}s")
return ready
@staticmethod
def benchmark_daemon_vs_direct():
"""Compare daemon vs direct performance."""
import steadytext
from steadytext.daemon import use_daemon
prompt = "Benchmark test prompt"
iterations = 50
# Benchmark direct access
print("Benchmarking direct access...")
direct_times = []
for _ in range(iterations):
start = time.perf_counter()
_ = steadytext.generate(prompt, seed=42)
direct_times.append(time.perf_counter() - start)
# Benchmark daemon access
print("Benchmarking daemon access...")
daemon_times = []
with use_daemon():
for _ in range(iterations):
start = time.perf_counter()
_ = steadytext.generate(prompt, seed=42)
daemon_times.append(time.perf_counter() - start)
# Calculate statistics
direct_avg = sum(direct_times) / len(direct_times) * 1000
daemon_avg = sum(daemon_times) / len(daemon_times) * 1000
speedup = direct_avg / daemon_avg
print(f"\nResults:")
print(f"Direct access: {direct_avg:.2f}ms average")
print(f"Daemon access: {daemon_avg:.2f}ms average")
print(f"Speedup: {speedup:.1f}x")
# First response comparison
print(f"\nFirst response:")
print(f"Direct: {direct_times[0]*1000:.2f}ms")
print(f"Daemon: {daemon_times[0]*1000:.2f}ms")
print(f"First response speedup: {direct_times[0]/daemon_times[0]:.1f}x")
Connection Pooling¶
import zmq
from contextlib import contextmanager
from threading import Lock
from typing import Dict, Any
class DaemonConnectionPool:
"""Connection pool for daemon clients."""
def __init__(self, host='127.0.0.1', port=5557, pool_size=10):
self.host = host
self.port = port
self.pool_size = pool_size
self.connections = []
self.available = []
self.lock = Lock()
self._initialize_pool()
def _initialize_pool(self):
"""Initialize connection pool."""
context = zmq.Context()
for _ in range(self.pool_size):
socket = context.socket(zmq.REQ)
socket.connect(f"tcp://{self.host}:{self.port}")
socket.setsockopt(zmq.LINGER, 0)
socket.setsockopt(zmq.RCVTIMEO, 5000) # 5 second timeout
self.connections.append(socket)
self.available.append(socket)
@contextmanager
def get_connection(self):
"""Get a connection from the pool."""
socket = None
try:
with self.lock:
if self.available:
socket = self.available.pop()
if socket is None:
raise RuntimeError("No connections available")
yield socket
finally:
if socket:
with self.lock:
self.available.append(socket)
def execute_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Execute request using pooled connection."""
import json
with self.get_connection() as socket:
socket.send_json(request)
response = socket.recv_json()
return response
# Usage
pool = DaemonConnectionPool(pool_size=20)
# Concurrent requests
import concurrent.futures
def make_request(i):
request = {
'type': 'generate',
'prompt': f'Test {i}',
'seed': 42
}
return pool.execute_request(request)
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(make_request, i) for i in range(100)]
results = [f.result() for f in futures]
Cache Tuning¶
Optimal Cache Configuration¶
import os
from typing import Dict, Tuple
class CacheTuner:
"""Tune cache settings for optimal performance."""
@staticmethod
def calculate_optimal_settings(
available_memory_mb: float,
expected_qps: float,
avg_prompt_length: int,
cache_for_hours: float = 24
) -> Dict[str, Dict[str, float]]:
"""Calculate optimal cache settings based on workload."""
# Estimate cache entry sizes
gen_entry_size_kb = 2 + (avg_prompt_length * 0.001) # Rough estimate
embed_entry_size_kb = 4.2 # 1024 floats + metadata
# Calculate expected entries
expected_requests = expected_qps * 3600 * cache_for_hours
unique_ratio = 0.3 # Assume 30% unique requests
expected_unique = expected_requests * unique_ratio
# Allocate memory (70% for generation, 30% for embedding)
gen_memory_mb = available_memory_mb * 0.7
embed_memory_mb = available_memory_mb * 0.3
# Calculate capacities
gen_capacity = min(
int(gen_memory_mb * 1024 / gen_entry_size_kb),
int(expected_unique * 0.8) # 80% of expected unique
)
embed_capacity = min(
int(embed_memory_mb * 1024 / embed_entry_size_kb),
int(expected_unique * 0.5) # 50% of expected unique
)
return {
'generation': {
'capacity': gen_capacity,
'max_size_mb': gen_memory_mb
},
'embedding': {
'capacity': embed_capacity,
'max_size_mb': embed_memory_mb
}
}
@staticmethod
def apply_settings(settings: Dict[str, Dict[str, float]]):
"""Apply cache settings via environment variables."""
os.environ['STEADYTEXT_GENERATION_CACHE_CAPACITY'] = str(
int(settings['generation']['capacity'])
)
os.environ['STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB'] = str(
settings['generation']['max_size_mb']
)
os.environ['STEADYTEXT_EMBEDDING_CACHE_CAPACITY'] = str(
int(settings['embedding']['capacity'])
)
os.environ['STEADYTEXT_EMBEDDING_CACHE_MAX_SIZE_MB'] = str(
settings['embedding']['max_size_mb']
)
print("Applied cache settings:")
print(f"Generation: {settings['generation']['capacity']} entries, "
f"{settings['generation']['max_size_mb']:.1f}MB")
print(f"Embedding: {settings['embedding']['capacity']} entries, "
f"{settings['embedding']['max_size_mb']:.1f}MB")
# Example usage
tuner = CacheTuner()
# For a server with 1GB available for caching, expecting 10 QPS
settings = tuner.calculate_optimal_settings(
available_memory_mb=1024,
expected_qps=10,
avg_prompt_length=100,
cache_for_hours=24
)
tuner.apply_settings(settings)
Cache Warming¶
import asyncio
from typing import List, Tuple
import steadytext
class CacheWarmer:
"""Warm up caches with common queries."""
def __init__(self, prompts: List[str], seeds: List[int] = None):
self.prompts = prompts
self.seeds = seeds or [42]
async def warm_generation_cache(self):
"""Warm generation cache asynchronously."""
tasks = []
for prompt in self.prompts:
for seed in self.seeds:
task = asyncio.create_task(
self._generate_async(prompt, seed)
)
tasks.append(task)
results = await asyncio.gather(*tasks)
successful = sum(1 for r in results if r is not None)
print(f"Warmed generation cache: {successful}/{len(tasks)} entries")
async def _generate_async(self, prompt: str, seed: int):
"""Generate text asynchronously."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
steadytext.generate,
prompt,
seed
)
def warm_embedding_cache(self):
"""Warm embedding cache."""
successful = 0
for text in self.prompts:
for seed in self.seeds:
try:
_ = steadytext.embed(text, seed=seed)
successful += 1
except Exception as e:
print(f"Failed to warm embed cache: {e}")
print(f"Warmed embedding cache: {successful}/{len(self.prompts) * len(self.seeds)} entries")
# Common prompts for warming
COMMON_PROMPTS = [
"Explain machine learning",
"Write a Python function",
"What is artificial intelligence?",
"How does deep learning work?",
"Summarize this text:",
"Translate to Spanish:",
"Generate documentation for",
"Create a test case",
"Explain the error:",
"Optimize this code:"
]
# Warm caches on startup
async def warm_caches():
warmer = CacheWarmer(COMMON_PROMPTS, seeds=[42, 123, 456])
await warmer.warm_generation_cache()
warmer.warm_embedding_cache()
# Run warming
asyncio.run(warm_caches())
Model Performance¶
Model Size Selection¶
from typing import Dict, Any
import time
class ModelBenchmark:
"""Benchmark different model configurations."""
@staticmethod
def compare_model_sizes():
"""Compare performance of different model sizes."""
import subprocess
import json
test_prompts = [
"Write a short function",
"Explain quantum computing in simple terms",
"Generate a creative story about AI"
]
results = {}
for size in ['small', 'large']:
print(f"\nBenchmarking {size} model...")
size_results = {
'latencies': [],
'quality_scores': [],
'memory_usage': []
}
for prompt in test_prompts:
# Measure latency
start = time.perf_counter()
result = subprocess.run([
'st', 'generate', prompt,
'--size', size,
'--json',
'--wait'
], capture_output=True, text=True)
latency = time.perf_counter() - start
if result.returncode == 0:
data = json.loads(result.stdout)
size_results['latencies'].append(latency)
# Simple quality metric (length and vocabulary)
text = data['text']
quality = len(set(text.split())) / len(text.split())
size_results['quality_scores'].append(quality)
results[size] = size_results
# Print comparison
print("\nModel Size Comparison:")
print("-" * 50)
for size, data in results.items():
avg_latency = sum(data['latencies']) / len(data['latencies'])
avg_quality = sum(data['quality_scores']) / len(data['quality_scores'])
print(f"{size.capitalize()} Model:")
print(f" Average latency: {avg_latency:.2f}s")
print(f" Quality score: {avg_quality:.3f}")
print(f" Latency range: {min(data['latencies']):.2f}s - {max(data['latencies']):.2f}s")
return results
Custom Model Configuration¶
class ModelOptimizer:
"""Optimize model loading and configuration."""
@staticmethod
def get_optimal_config(use_case: str) -> Dict[str, Any]:
"""Get optimal model configuration for use case."""
configs = {
'realtime': {
'model': 'small',
'n_threads': 4,
'n_batch': 8,
'context_length': 512,
'use_mlock': True,
'use_mmap': True
},
'quality': {
'model': 'large',
'n_threads': 8,
'n_batch': 16,
'context_length': 2048,
'use_mlock': True,
'use_mmap': True
},
'batch': {
'model': 'large',
'n_threads': 16,
'n_batch': 32,
'context_length': 1024,
'use_mlock': False,
'use_mmap': True
}
}
return configs.get(use_case, configs['realtime'])
@staticmethod
def optimize_for_hardware():
"""Detect hardware and optimize configuration."""
import psutil
# Get system info
cpu_count = psutil.cpu_count(logical=True)
memory_gb = psutil.virtual_memory().total / (1024**3)
# Determine optimal settings
if memory_gb >= 32 and cpu_count >= 16:
config = {
'profile': 'high-performance',
'model': 'large',
'n_threads': min(cpu_count - 2, 24),
'cache_size_mb': 2048
}
elif memory_gb >= 16 and cpu_count >= 8:
config = {
'profile': 'balanced',
'model': 'large',
'n_threads': min(cpu_count - 1, 12),
'cache_size_mb': 1024
}
else:
config = {
'profile': 'low-resource',
'model': 'small',
'n_threads': min(cpu_count, 4),
'cache_size_mb': 256
}
print(f"Hardware profile: {config['profile']}")
print(f"Detected: {cpu_count} CPUs, {memory_gb:.1f}GB RAM")
print(f"Recommended: {config['model']} model, {config['n_threads']} threads")
return config
Batch Processing¶
Efficient Batch Operations¶
from typing import List, Dict, Any
import concurrent.futures
import asyncio
class BatchProcessor:
"""Process multiple requests efficiently."""
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
def process_batch_sync(
self,
prompts: List[str],
seeds: List[int] = None
) -> List[str]:
"""Process batch synchronously with thread pool."""
if seeds is None:
seeds = [42] * len(prompts)
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for prompt, seed in zip(prompts, seeds):
future = executor.submit(steadytext.generate, prompt, seed)
futures.append(future)
results = []
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
results.append(f"Error: {e}")
return results
async def process_batch_async(
self,
prompts: List[str],
seeds: List[int] = None
) -> List[str]:
"""Process batch asynchronously."""
if seeds is None:
seeds = [42] * len(prompts)
tasks = []
for prompt, seed in zip(prompts, seeds):
task = asyncio.create_task(
self._generate_async(prompt, seed)
)
tasks.append(task)
return await asyncio.gather(*tasks)
async def _generate_async(self, prompt: str, seed: int) -> str:
"""Generate text asynchronously."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
steadytext.generate,
prompt,
seed
)
def process_streaming_batch(
self,
prompts: List[str],
callback: callable
):
"""Process batch with streaming results."""
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_prompt = {
executor.submit(steadytext.generate, prompt, 42): prompt
for prompt in prompts
}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_prompt):
prompt = future_to_prompt[future]
try:
result = future.result()
callback(prompt, result, None)
except Exception as e:
callback(prompt, None, e)
# Usage example
processor = BatchProcessor(max_workers=8)
# Sync batch processing
prompts = ["Explain " + topic for topic in ["AI", "ML", "DL", "NLP"]]
results = processor.process_batch_sync(prompts)
# Async batch processing
async def async_example():
results = await processor.process_batch_async(prompts)
for prompt, result in zip(prompts, results):
print(f"{prompt}: {len(result)} chars")
# Streaming results
def handle_result(prompt, result, error):
if error:
print(f"Error for '{prompt}': {error}")
else:
print(f"Completed '{prompt}': {len(result)} chars")
processor.process_streaming_batch(prompts, handle_result)
Pipeline Optimization¶
class Pipeline:
"""Optimized processing pipeline."""
def __init__(self):
self.stages = []
def add_stage(self, func, name=None):
"""Add processing stage."""
self.stages.append({
'func': func,
'name': name or func.__name__
})
return self
async def process(self, items: List[Any]) -> List[Any]:
"""Process items through pipeline."""
current = items
for stage in self.stages:
print(f"Processing stage: {stage['name']}")
# Process stage in parallel
tasks = []
for item in current:
task = asyncio.create_task(
self._process_item(stage['func'], item)
)
tasks.append(task)
current = await asyncio.gather(*tasks)
return current
async def _process_item(self, func, item):
"""Process single item."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, func, item)
# Example: Text processing pipeline
async def text_pipeline_example():
# Define stages
def clean_text(text):
return text.strip().lower()
def generate_summary(text):
prompt = f"Summarize in one sentence: {text}"
return steadytext.generate(prompt, seed=42)
def extract_keywords(summary):
prompt = f"Extract 3 keywords from: {summary}"
return steadytext.generate(prompt, seed=123)
# Build pipeline
pipeline = Pipeline()
pipeline.add_stage(clean_text, "Clean")
pipeline.add_stage(generate_summary, "Summarize")
pipeline.add_stage(extract_keywords, "Keywords")
# Process texts
texts = [
"Machine learning is transforming industries...",
"Artificial intelligence enables computers...",
"Deep learning uses neural networks..."
]
results = await pipeline.process(texts)
return results
Memory Management¶
Memory Optimization Strategies¶
import gc
import tracemalloc
from typing import List, Dict, Any
class MemoryOptimizer:
"""Optimize memory usage for SteadyText operations."""
def __init__(self):
self.snapshots = []
def start_profiling(self):
"""Start memory profiling."""
tracemalloc.start()
self.snapshots = []
def take_snapshot(self, label: str):
"""Take memory snapshot."""
snapshot = tracemalloc.take_snapshot()
self.snapshots.append((label, snapshot))
def get_memory_report(self) -> str:
"""Generate memory usage report."""
if len(self.snapshots) < 2:
return "Not enough snapshots for comparison"
report = []
for i in range(1, len(self.snapshots)):
label1, snap1 = self.snapshots[i-1]
label2, snap2 = self.snapshots[i]
diff = snap2.compare_to(snap1, 'lineno')
report.append(f"\n{label1} -> {label2}:")
for stat in diff[:10]: # Top 10 differences
report.append(f" {stat}")
return "\n".join(report)
@staticmethod
def optimize_batch_memory(items: List[Any], batch_size: int = 100):
"""Process items in batches to control memory."""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
# Process batch
batch_results = [
steadytext.generate(item, seed=42)
for item in batch
]
results.extend(batch_results)
# Force garbage collection after each batch
gc.collect()
return results
@staticmethod
def memory_efficient_streaming(prompts: List[str]):
"""Memory-efficient streaming generation."""
for prompt in prompts:
# Generate and yield immediately
result = steadytext.generate(prompt, seed=42)
yield result
# Clear any references
del result
# Periodic garbage collection
if prompts.index(prompt) % 100 == 0:
gc.collect()
# Example usage
optimizer = MemoryOptimizer()
optimizer.start_profiling()
# Take initial snapshot
optimizer.take_snapshot("Initial")
# Generate some text
texts = []
for i in range(1000):
text = steadytext.generate(f"Test {i}", seed=42)
texts.append(text)
optimizer.take_snapshot("After 1000 generations")
# Clear and collect
texts.clear()
gc.collect()
optimizer.take_snapshot("After cleanup")
# Get report
print(optimizer.get_memory_report())
Resource Limits¶
import resource
import signal
from contextlib import contextmanager
class ResourceLimiter:
"""Set resource limits for operations."""
@staticmethod
@contextmanager
def limit_memory(max_memory_mb: int):
"""Limit memory usage."""
# Convert MB to bytes
max_memory = max_memory_mb * 1024 * 1024
# Set soft and hard limits
resource.setrlimit(
resource.RLIMIT_AS,
(max_memory, max_memory)
)
try:
yield
finally:
# Reset to unlimited
resource.setrlimit(
resource.RLIMIT_AS,
(resource.RLIM_INFINITY, resource.RLIM_INFINITY)
)
@staticmethod
@contextmanager
def timeout(seconds: int):
"""Set operation timeout."""
def timeout_handler(signum, frame):
raise TimeoutError(f"Operation timed out after {seconds} seconds")
# Set handler
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
# Usage
limiter = ResourceLimiter()
# Limit memory usage
try:
with limiter.limit_memory(1024): # 1GB limit
# Memory-intensive operation
results = [
steadytext.generate(f"Prompt {i}", seed=42)
for i in range(10000)
]
except MemoryError:
print("Memory limit exceeded")
# Set timeout
try:
with limiter.timeout(5): # 5 second timeout
result = steadytext.generate("Complex prompt", seed=42)
except TimeoutError:
print("Operation timed out")
Concurrent Operations¶
Thread-Safe Operations¶
import threading
from queue import Queue
from typing import List, Tuple, Any
class ConcurrentProcessor:
"""Thread-safe concurrent processing."""
def __init__(self, num_workers: int = 4):
self.num_workers = num_workers
self.input_queue = Queue()
self.output_queue = Queue()
self.workers = []
self.running = False
def start(self):
"""Start worker threads."""
self.running = True
for i in range(self.num_workers):
worker = threading.Thread(
target=self._worker,
name=f"Worker-{i}"
)
worker.daemon = True
worker.start()
self.workers.append(worker)
def stop(self):
"""Stop all workers."""
self.running = False
# Add stop signals
for _ in range(self.num_workers):
self.input_queue.put(None)
# Wait for workers
for worker in self.workers:
worker.join()
def _worker(self):
"""Worker thread function."""
while self.running:
item = self.input_queue.get()
if item is None:
break
prompt, seed, request_id = item
try:
result = steadytext.generate(prompt, seed=seed)
self.output_queue.put((request_id, result, None))
except Exception as e:
self.output_queue.put((request_id, None, e))
self.input_queue.task_done()
def process_concurrent(
self,
prompts: List[str],
seeds: List[int] = None
) -> List[Tuple[int, Any, Any]]:
"""Process prompts concurrently."""
if seeds is None:
seeds = [42] * len(prompts)
# Add all items to queue
for i, (prompt, seed) in enumerate(zip(prompts, seeds)):
self.input_queue.put((prompt, seed, i))
# Collect results
results = []
for _ in range(len(prompts)):
result = self.output_queue.get()
results.append(result)
# Sort by request ID
results.sort(key=lambda x: x[0])
return results
# Usage
processor = ConcurrentProcessor(num_workers=8)
processor.start()
# Process requests
prompts = [f"Generate text about topic {i}" for i in range(100)]
results = processor.process_concurrent(prompts)
# Check results
successful = sum(1 for _, result, error in results if error is None)
print(f"Processed {successful}/{len(prompts)} successfully")
processor.stop()
Async Concurrency¶
import asyncio
from typing import List, Dict, Any
class AsyncConcurrentProcessor:
"""Asynchronous concurrent processing."""
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results = {}
async def process_with_limit(
self,
prompt: str,
seed: int,
request_id: int
):
"""Process with concurrency limit."""
async with self.semaphore:
result = await self._generate_async(prompt, seed)
self.results[request_id] = result
return result
async def _generate_async(self, prompt: str, seed: int) -> str:
"""Async generation wrapper."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
steadytext.generate,
prompt,
seed
)
async def process_batch_limited(
self,
prompts: List[str],
seeds: List[int] = None
) -> List[str]:
"""Process batch with concurrency limit."""
if seeds is None:
seeds = [42] * len(prompts)
tasks = []
for i, (prompt, seed) in enumerate(zip(prompts, seeds)):
task = self.process_with_limit(prompt, seed, i)
tasks.append(task)
await asyncio.gather(*tasks)
# Return results in order
return [self.results[i] for i in range(len(prompts))]
# Usage
async def concurrent_example():
processor = AsyncConcurrentProcessor(max_concurrent=20)
# Generate 100 prompts
prompts = [f"Explain concept {i}" for i in range(100)]
start_time = asyncio.get_event_loop().time()
results = await processor.process_batch_limited(prompts)
end_time = asyncio.get_event_loop().time()
print(f"Processed {len(results)} prompts in {end_time - start_time:.2f}s")
print(f"Average: {(end_time - start_time) / len(results):.3f}s per prompt")
# Run
asyncio.run(concurrent_example())
Monitoring and Profiling¶
Performance Dashboard¶
import time
from collections import deque
from dataclasses import dataclass
from typing import Deque, Dict, Any
import threading
@dataclass
class MetricPoint:
"""Single metric data point."""
timestamp: float
value: float
labels: Dict[str, str]
class PerformanceDashboard:
"""Real-time performance monitoring dashboard."""
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.metrics: Dict[str, Deque[MetricPoint]] = {}
self.lock = threading.Lock()
def record_metric(self, name: str, value: float, labels: Dict[str, str] = None):
"""Record a metric value."""
with self.lock:
if name not in self.metrics:
self.metrics[name] = deque(maxlen=self.window_size)
point = MetricPoint(
timestamp=time.time(),
value=value,
labels=labels or {}
)
self.metrics[name].append(point)
def get_stats(self, name: str, window_seconds: float = 60) -> Dict[str, float]:
"""Get statistics for a metric."""
with self.lock:
if name not in self.metrics:
return {}
current_time = time.time()
cutoff_time = current_time - window_seconds
# Filter points within window
points = [
p.value for p in self.metrics[name]
if p.timestamp >= cutoff_time
]
if not points:
return {}
return {
'count': len(points),
'mean': sum(points) / len(points),
'min': min(points),
'max': max(points),
'rate': len(points) / window_seconds
}
def print_dashboard(self):
"""Print performance dashboard."""
print("\n" + "="*60)
print("SteadyText Performance Dashboard")
print("="*60)
for metric_name in sorted(self.metrics.keys()):
stats = self.get_stats(metric_name)
if stats:
print(f"\n{metric_name}:")
print(f" Rate: {stats['rate']:.2f}/s")
print(f" Mean: {stats['mean']:.2f}")
print(f" Range: {stats['min']:.2f} - {stats['max']:.2f}")
# Global dashboard instance
dashboard = PerformanceDashboard()
# Instrumented generation function
def monitored_generate(prompt: str, seed: int = 42) -> str:
"""Generate with monitoring."""
start_time = time.perf_counter()
try:
result = steadytext.generate(prompt, seed=seed)
latency = (time.perf_counter() - start_time) * 1000
dashboard.record_metric('generation_latency_ms', latency)
dashboard.record_metric('generation_success', 1)
return result
except Exception as e:
dashboard.record_metric('generation_error', 1)
raise
# Background monitoring thread
def monitor_system():
"""Monitor system metrics."""
import psutil
while True:
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
dashboard.record_metric('cpu_percent', cpu_percent)
# Memory usage
memory = psutil.virtual_memory()
dashboard.record_metric('memory_percent', memory.percent)
dashboard.record_metric('memory_mb', memory.used / 1024 / 1024)
# Print dashboard every 10 seconds
time.sleep(10)
dashboard.print_dashboard()
# Start monitoring
monitor_thread = threading.Thread(target=monitor_system, daemon=True)
monitor_thread.start()
Production Optimization¶
Production Configuration¶
from typing import Dict, Any
import yaml
class ProductionConfig:
"""Production-optimized configuration."""
@staticmethod
def generate_config(environment: str = 'production') -> Dict[str, Any]:
"""Generate environment-specific configuration."""
configs = {
'development': {
'daemon': {
'enabled': False,
'host': '127.0.0.1',
'port': 5557
},
'cache': {
'generation_capacity': 256,
'generation_max_size_mb': 50,
'embedding_capacity': 512,
'embedding_max_size_mb': 100
},
'models': {
'default_size': 'small',
'preload': False
},
'monitoring': {
'enabled': True,
'verbose': True
}
},
'staging': {
'daemon': {
'enabled': True,
'host': '0.0.0.0',
'port': 5557,
'workers': 4
},
'cache': {
'generation_capacity': 1024,
'generation_max_size_mb': 200,
'embedding_capacity': 2048,
'embedding_max_size_mb': 400
},
'models': {
'default_size': 'large',
'preload': True
},
'monitoring': {
'enabled': True,
'verbose': False
}
},
'production': {
'daemon': {
'enabled': True,
'host': '0.0.0.0',
'port': 5557,
'workers': 16,
'max_connections': 1000
},
'cache': {
'generation_capacity': 4096,
'generation_max_size_mb': 1024,
'embedding_capacity': 8192,
'embedding_max_size_mb': 2048
},
'models': {
'default_size': 'large',
'preload': True,
'mlock': True
},
'monitoring': {
'enabled': True,
'verbose': False,
'metrics_endpoint': '/metrics'
},
'security': {
'rate_limiting': True,
'max_requests_per_minute': 600,
'require_auth': True
}
}
}
return configs.get(environment, configs['production'])
@staticmethod
def save_config(config: Dict[str, Any], filename: str):
"""Save configuration to file."""
with open(filename, 'w') as f:
yaml.dump(config, f, default_flow_style=False)
@staticmethod
def apply_config(config: Dict[str, Any]):
"""Apply configuration to environment."""
import os
# Apply cache settings
cache = config.get('cache', {})
os.environ['STEADYTEXT_GENERATION_CACHE_CAPACITY'] = str(
cache.get('generation_capacity', 256)
)
os.environ['STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB'] = str(
cache.get('generation_max_size_mb', 50)
)
# Apply model settings
models = config.get('models', {})
if models.get('preload'):
import subprocess
subprocess.run(['st', 'models', 'preload'], check=True)
print(f"Applied configuration for environment")
# Generate and apply production config
config = ProductionConfig.generate_config('production')
ProductionConfig.save_config(config, 'steadytext-prod.yaml')
ProductionConfig.apply_config(config)
Health Checks¶
import asyncio
from enum import Enum
from typing import Dict, Any, List
class HealthStatus(Enum):
"""Health check status."""
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
class HealthChecker:
"""Production health checking."""
def __init__(self):
self.checks = {}
async def check_daemon_health(self) -> Dict[str, Any]:
"""Check daemon health."""
import subprocess
import json
try:
result = subprocess.run([
'st', 'daemon', 'status', '--json'
], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
data = json.loads(result.stdout)
return {
'status': HealthStatus.HEALTHY if data.get('running') else HealthStatus.UNHEALTHY,
'details': data
}
else:
return {
'status': HealthStatus.UNHEALTHY,
'error': 'Daemon not responding'
}
except Exception as e:
return {
'status': HealthStatus.UNHEALTHY,
'error': str(e)
}
async def check_model_health(self) -> Dict[str, Any]:
"""Check model availability."""
try:
# Quick generation test
start = time.time()
result = steadytext.generate("health check", seed=42)
latency = time.time() - start
if result and latency < 5.0:
status = HealthStatus.HEALTHY
elif result and latency < 10.0:
status = HealthStatus.DEGRADED
else:
status = HealthStatus.UNHEALTHY
return {
'status': status,
'latency': latency,
'model_loaded': result is not None
}
except Exception as e:
return {
'status': HealthStatus.UNHEALTHY,
'error': str(e)
}
async def check_cache_health(self) -> Dict[str, Any]:
"""Check cache health."""
try:
cache_manager = get_cache_manager()
stats = cache_manager.get_cache_stats()
# Check if caches are responsive
gen_size = stats.get('generation', {}).get('size', 0)
embed_size = stats.get('embedding', {}).get('size', 0)
return {
'status': HealthStatus.HEALTHY,
'generation_cache_size': gen_size,
'embedding_cache_size': embed_size
}
except Exception as e:
return {
'status': HealthStatus.UNHEALTHY,
'error': str(e)
}
async def run_all_checks(self) -> Dict[str, Any]:
"""Run all health checks."""
checks = {
'daemon': self.check_daemon_health(),
'model': self.check_model_health(),
'cache': self.check_cache_health()
}
results = {}
for name, check in checks.items():
results[name] = await check
# Overall status
statuses = [r['status'] for r in results.values()]
if all(s == HealthStatus.HEALTHY for s in statuses):
overall = HealthStatus.HEALTHY
elif any(s == HealthStatus.UNHEALTHY for s in statuses):
overall = HealthStatus.UNHEALTHY
else:
overall = HealthStatus.DEGRADED
return {
'status': overall.value,
'checks': results,
'timestamp': time.time()
}
# Health check endpoint
async def health_endpoint():
"""Health check endpoint for monitoring."""
checker = HealthChecker()
result = await checker.run_all_checks()
# Return appropriate HTTP status
if result['status'] == 'healthy':
return result, 200
elif result['status'] == 'degraded':
return result, 200
else:
return result, 503
Benchmarking¶
Comprehensive Benchmark Suite¶
import json
import subprocess
from pathlib import Path
from typing import Dict, List, Any
class BenchmarkSuite:
"""Comprehensive performance benchmarking."""
def __init__(self, output_dir: str = "./benchmarks"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def run_latency_benchmark(self) -> Dict[str, Any]:
"""Benchmark operation latencies."""
results = {
'generation': [],
'embedding': []
}
# Test different prompt lengths
prompt_lengths = [10, 50, 100, 500, 1000]
for length in prompt_lengths:
prompt = " ".join(["word"] * length)
# Generation latency
start = time.perf_counter()
_ = steadytext.generate(prompt, seed=42)
gen_latency = (time.perf_counter() - start) * 1000
# Embedding latency
start = time.perf_counter()
_ = steadytext.embed(prompt, seed=42)
embed_latency = (time.perf_counter() - start) * 1000
results['generation'].append({
'prompt_length': length,
'latency_ms': gen_latency
})
results['embedding'].append({
'text_length': length,
'latency_ms': embed_latency
})
return results
def run_throughput_benchmark(self, duration_seconds: int = 60) -> Dict[str, Any]:
"""Benchmark throughput over time."""
results = {
'generation': {'requests': 0, 'duration': duration_seconds},
'embedding': {'requests': 0, 'duration': duration_seconds}
}
# Generation throughput
start_time = time.time()
gen_count = 0
while time.time() - start_time < duration_seconds / 2:
_ = steadytext.generate(f"Test {gen_count}", seed=42)
gen_count += 1
results['generation']['requests'] = gen_count
results['generation']['rps'] = gen_count / (duration_seconds / 2)
# Embedding throughput
start_time = time.time()
embed_count = 0
while time.time() - start_time < duration_seconds / 2:
_ = steadytext.embed(f"Test {embed_count}", seed=42)
embed_count += 1
results['embedding']['requests'] = embed_count
results['embedding']['rps'] = embed_count / (duration_seconds / 2)
return results
def run_cache_benchmark(self) -> Dict[str, Any]:
"""Benchmark cache performance."""
from steadytext import get_cache_manager
cache_manager = get_cache_manager()
results = {'before': {}, 'after': {}}
# Clear caches
cache_manager.clear_all_caches()
# Get initial stats
results['before'] = cache_manager.get_cache_stats()
# Generate cache misses
miss_times = []
for i in range(100):
start = time.perf_counter()
_ = steadytext.generate(f"Unique prompt {i}", seed=42)
miss_times.append((time.perf_counter() - start) * 1000)
# Generate cache hits
hit_times = []
for i in range(100):
start = time.perf_counter()
_ = steadytext.generate(f"Unique prompt {i}", seed=42)
hit_times.append((time.perf_counter() - start) * 1000)
# Get final stats
results['after'] = cache_manager.get_cache_stats()
results['performance'] = {
'miss_latency_avg': sum(miss_times) / len(miss_times),
'hit_latency_avg': sum(hit_times) / len(hit_times),
'speedup': sum(miss_times) / sum(hit_times)
}
return results
def run_full_benchmark(self) -> Dict[str, Any]:
"""Run complete benchmark suite."""
print("Running SteadyText Performance Benchmark Suite...")
results = {
'timestamp': time.time(),
'latency': self.run_latency_benchmark(),
'throughput': self.run_throughput_benchmark(30),
'cache': self.run_cache_benchmark()
}
# Save results
output_file = self.output_dir / f"benchmark_{int(time.time())}.json"
with open(output_file, 'w') as f:
json.dump(results, f, indent=2)
print(f"Benchmark complete. Results saved to {output_file}")
# Print summary
self.print_summary(results)
return results
def print_summary(self, results: Dict[str, Any]):
"""Print benchmark summary."""
print("\n" + "="*60)
print("Benchmark Summary")
print("="*60)
# Latency summary
gen_latencies = [r['latency_ms'] for r in results['latency']['generation']]
print(f"\nGeneration Latency:")
print(f" Min: {min(gen_latencies):.2f}ms")
print(f" Max: {max(gen_latencies):.2f}ms")
print(f" Avg: {sum(gen_latencies)/len(gen_latencies):.2f}ms")
# Throughput summary
print(f"\nThroughput:")
print(f" Generation: {results['throughput']['generation']['rps']:.2f} req/s")
print(f" Embedding: {results['throughput']['embedding']['rps']:.2f} req/s")
# Cache summary
cache_perf = results['cache']['performance']
print(f"\nCache Performance:")
print(f" Miss latency: {cache_perf['miss_latency_avg']:.2f}ms")
print(f" Hit latency: {cache_perf['hit_latency_avg']:.2f}ms")
print(f" Speedup: {cache_perf['speedup']:.1f}x")
# Run benchmarks
if __name__ == "__main__":
suite = BenchmarkSuite()
suite.run_full_benchmark()
Best Practices¶
Performance Checklist¶
- Always use daemon mode for production deployments
- Configure caches based on workload and available memory
- Use appropriate model sizes - small for real-time, large for quality
- Batch operations when processing multiple items
- Monitor performance continuously in production
- Set resource limits to prevent runaway processes
- Use connection pooling for high-concurrency scenarios
- Implement health checks for production monitoring
- Profile regularly to identify bottlenecks
- Optimize for your hardware - use all available cores
Quick Optimization Guide¶
# 1. Start daemon for 160x faster responses
st daemon start
# 2. Preload models to avoid first-request delay
st models preload
# 3. Configure optimal cache sizes
export STEADYTEXT_GENERATION_CACHE_CAPACITY=2048
export STEADYTEXT_GENERATION_CACHE_MAX_SIZE_MB=500
# 4. Use batch processing in your code
# 5. Monitor with built-in tools
st cache --status
# 6. Run benchmarks to validate
python benchmarks/run_all_benchmarks.py --quick
Common Pitfalls¶
Performance Pitfalls to Avoid
- Not using daemon mode - 160x slower first requests
- Cache thrashing - Set appropriate capacity limits
- Memory leaks - Use batch processing with cleanup
- Thread contention - Limit concurrent operations
- Inefficient prompts - Keep prompts concise
- Ignoring monitoring - Always track performance metrics